You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2010/01/31 01:31:57 UTC
svn commit: r904934 [7/11] - in /qpid/trunk/qpid/java:
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/ broker/src/main/java/org/apache/qpid/qmf/ br...
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Sun Jan 31 00:31:49 2010
@@ -20,50 +20,16 @@
*/
package org.apache.qpid.server.protocol;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicMarkableReference;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.management.JMException;
-import javax.security.sasl.SaslServer;
-
import org.apache.log4j.Logger;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
+
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQProtocolHeaderException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.*;
import org.apache.qpid.pool.Job;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
@@ -71,6 +37,10 @@
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.ConnectionConfig;
+import org.apache.qpid.server.configuration.ConnectionConfigType;
import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -85,12 +55,31 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Sender;
-public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession
+import javax.management.JMException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -161,6 +150,8 @@
private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
private long _maxFrameSize;
private final AtomicBoolean _closing = new AtomicBoolean(false);
+ private final UUID _id;
+ private final ConfigStore _configStore;
public ManagedObject getManagedObject()
{
@@ -181,6 +172,10 @@
_logSubject = new ConnectionLogSubject(this);
+ _configStore = virtualHostRegistry.getConfigStore();
+ _id = _configStore.createId();
+
+
_actor.message(ConnectionMessages.CON_OPEN(null, null, false, false));
}
@@ -765,6 +760,8 @@
public void closeProtocolSession()
{
+ getConfigStore().removeConfiguredObject(this);
+
_networkDriver.close();
try
{
@@ -902,6 +899,8 @@
_virtualHost = virtualHost;
_virtualHost.getConnectionRegistry().registerConnection(this);
+
+ _configStore.addConfiguredObject(this);
try
{
@@ -1067,4 +1066,69 @@
}
}
+ public Boolean isIncoming()
+ {
+ return true;
+ }
+
+ public Boolean isSystemConnection()
+ {
+ return false;
+ }
+
+ public Boolean isFederationLink()
+ {
+ return false;
+ }
+
+ public String getAuthId()
+ {
+ return getAuthorizedID().getName();
+ }
+
+ public Integer getRemotePID()
+ {
+ return null;
+ }
+
+ public String getRemoteProcessName()
+ {
+ return null;
+ }
+
+ public Integer getRemoteParentPID()
+ {
+ return null;
+ }
+
+ public ConfigStore getConfigStore()
+ {
+ return _configStore;
+ }
+
+ public ConnectionConfigType getConfigType()
+ {
+ return ConnectionConfigType.getInstance();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return getVirtualHost();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public String getAddress()
+ {
+ return String.valueOf(getRemoteAddress());
+ }
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Sun Jan 31 00:31:49 2010
@@ -37,8 +37,19 @@
*/
package org.apache.qpid.server.protocol;
-import java.util.Date;
-import java.util.List;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -55,22 +66,8 @@
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.ManagementActor;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
+import java.util.Date;
+import java.util.List;
/**
* This MBean class implements the management interface. In order to make more attributes, operations and notifications
@@ -255,7 +252,7 @@
Object[] itemValues =
{
channel.getChannelId(), channel.isTransactional(),
- (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName().asString() : null,
+ (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getNameShortString().asString() : null,
channel.getUnacknowledgedMessageMap().size(), channel.getBlocking()
};
@@ -291,7 +288,7 @@
// then the CurrentActor could be set in our JMX Proxy object.
// As it is we need to set the CurrentActor on all MBean methods
// Ideally we would not have a single method that can be called from
- // two contexts.
+ // two contexts.
boolean removeActor = false;
if (CurrentActor.get() == null)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Sun Jan 31 00:31:49 2010
@@ -20,14 +20,14 @@
*/
package org.apache.qpid.server.protocol;
+import org.apache.log4j.Logger;
+
import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.transport.ServerConnection;
-import org.apache.log4j.Logger;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.NetworkDriver;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -239,10 +239,10 @@
final ConnectionDelegate connDelegate =
new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn);
- Connection conn = new ServerConnection();
+ ServerConnection conn = new ServerConnection();
conn.setConnectionDelegate(connDelegate);
- return new ProtocolEngine_0_10( conn, _networkDriver);
+ return new ProtocolEngine_0_10( conn, _networkDriver, _appRegistry);
}
};
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Sun Jan 31 00:31:49 2010
@@ -26,23 +26,34 @@
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.server.configuration.*;
+import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import java.net.SocketAddress;
+import java.util.UUID;
-public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine
+public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine, ConnectionConfig
{
public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
private NetworkDriver _networkDriver;
private long _readBytes;
private long _writtenBytes;
- private Connection _connection;
-
- public ProtocolEngine_0_10(Connection conn, NetworkDriver networkDriver)
+ private ServerConnection _connection;
+ private final UUID _id;
+ private final IApplicationRegistry _appRegistry;
+
+ public ProtocolEngine_0_10(ServerConnection conn,
+ NetworkDriver networkDriver,
+ final IApplicationRegistry appRegistry)
{
super(new Assembler(conn));
_connection = conn;
+ _connection.setConnectionConfig(this);
_networkDriver = networkDriver;
+ _id = appRegistry.getConfigStore().createId();
+ _appRegistry = appRegistry;
}
public void setNetworkDriver(NetworkDriver driver)
@@ -50,6 +61,14 @@
_networkDriver = driver;
Disassembler dis = new Disassembler(driver, MAX_FRAME_SIZE);
_connection.setSender(dis);
+ _connection.onOpen(new Runnable()
+ {
+ public void run()
+ {
+ getConfigStore().addConfiguredObject(ProtocolEngine_0_10.this);
+ }
+ });
+
}
public SocketAddress getRemoteAddress()
@@ -81,4 +100,81 @@
{
//Todo
}
+
+ public VirtualHostConfig getVirtualHost()
+ {
+ return _connection.getVirtualHost();
+ }
+
+ public String getAddress()
+ {
+ return getRemoteAddress().toString();
+ }
+
+ public Boolean isIncoming()
+ {
+ return true;
+ }
+
+ public Boolean isSystemConnection()
+ {
+ return false;
+ }
+
+ public Boolean isFederationLink()
+ {
+ return false;
+ }
+
+ public String getAuthId()
+ {
+ return _connection.getAuthorizationID();
+ }
+
+ public String getRemoteProcessName()
+ {
+ return null;
+ }
+
+ public Integer getRemotePID()
+ {
+ return null;
+ }
+
+ public Integer getRemoteParentPID()
+ {
+ return null;
+ }
+
+ public ConfigStore getConfigStore()
+ {
+ return _appRegistry.getConfigStore();
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public ConnectionConfigType getConfigType()
+ {
+ return ConnectionConfigType.getInstance();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return getVirtualHost();
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ @Override
+ public void closed()
+ {
+ super.closed();
+ getConfigStore().removeConfiguredObject(this);
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Sun Jan 31 00:31:49 2010
@@ -21,9 +21,11 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.Map;
public class AMQPriorityQueue extends SimpleAMQQueue
{
@@ -32,18 +34,18 @@
final AMQShortString owner,
final boolean autoDelete,
final VirtualHost virtualHost,
- int priorities)
+ int priorities, Map<String, Object> arguments)
{
- super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities));
+ super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities),arguments);
}
public AMQPriorityQueue(String queueName,
boolean durable,
String owner,
boolean autoDelete,
- VirtualHost virtualHost, int priorities)
+ VirtualHost virtualHost, int priorities, Map<String,Object> arguments)
{
- this(new AMQShortString(queueName), durable, new AMQShortString(owner),autoDelete,virtualHost,priorities);
+ this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner),autoDelete,virtualHost,priorities, arguments);
}
public int getPriorities()
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Sun Jan 31 00:31:49 2010
@@ -20,44 +20,48 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.configuration.QueueConfig;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.management.Managable;
+import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.List;
-import java.util.Set;
import java.util.Map;
+import java.util.Set;
-public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource
+public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue,
+ QueueConfig
{
boolean getDeleteOnNoConsumers();
void setDeleteOnNoConsumers(boolean b);
+ void addBinding(Binding binding);
+
+ void removeBinding(Binding binding);
+
+ List<Binding> getBindings();
+
+ int getBindingCount();
public interface Context
{
QueueEntry getLastSeenEntry();
}
- AMQShortString getName();
-
void setNoLocal(boolean b);
- boolean isDurable();
-
boolean isAutoDelete();
AMQShortString getOwner();
@@ -69,14 +73,6 @@
VirtualHost getVirtualHost();
-
- void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException;
-
- void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException;
-
- List<ExchangeBinding> getExchangeBindings();
-
-
void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException;
void unregisterSubscription(final Subscription subscription) throws AMQException;
@@ -86,6 +82,8 @@
int getActiveConsumerCount();
+ boolean hasExclusiveSubscriber();
+
boolean isUnused();
boolean isEmpty();
@@ -107,8 +105,6 @@
int delete() throws AMQException;
- QueueEntry enqueue(ServerMessage message) throws AMQException;
-
void requeue(QueueEntry entry);
void requeue(QueueEntryImpl storeContext, Subscription subscription);
@@ -122,6 +118,8 @@
void addQueueDeleteTask(final Task task);
+ void removeQueueDeleteTask(final Task task);
+
List<QueueEntry> getMessagesOnTheQueue();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Sun Jan 31 00:31:49 2010
@@ -23,16 +23,21 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.configuration.QueueConfiguration;
import java.util.Map;
+import java.util.HashMap;
public class AMQQueueFactory
{
public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
+ private static final String QPID_LVQ_KEY = "qpid.LVQ_key";
+ private static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
+ private static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
+
private abstract static class QueueProperty
{
@@ -130,21 +135,60 @@
boolean autoDelete,
VirtualHost virtualHost, final FieldTable arguments)
{
- final int priorities = arguments == null ? 1 : arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1;
+ return createAMQQueueImpl(name == null ? null : name.toString(),
+ durable,
+ owner == null ? null : owner.toString(),
+ autoDelete,
+ virtualHost,
+ FieldTable.convertToMap(arguments));
+ }
- AMQQueue q = null;
- if(priorities > 1)
+
+ public static AMQQueue createAMQQueueImpl(String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ VirtualHost virtualHost, Map<String, Object> arguments)
+ {
+ int priorities = 1;
+ String conflationKey = null;
+ if(arguments != null)
{
- q = new AMQPriorityQueue(name, durable, owner, autoDelete, virtualHost, priorities);
+ if(arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
+ {
+ conflationKey = (String) arguments.get(QPID_LAST_VALUE_QUEUE_KEY);
+ if(conflationKey == null)
+ {
+ conflationKey = QPID_LVQ_KEY;
+ }
+ }
+ else if(arguments.containsKey(X_QPID_PRIORITIES))
+ {
+ Object prioritiesObj = arguments.get(X_QPID_PRIORITIES);
+ if(prioritiesObj instanceof Number)
+ {
+ priorities = ((Number)prioritiesObj).intValue();
+ }
+ }
+ }
+
+ AMQQueue q;
+ if(conflationKey != null)
+ {
+ q = new ConflationQueue(queueName, durable, owner, autoDelete, virtualHost, arguments, conflationKey);
+ }
+ else if(priorities > 1)
+ {
+ q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, virtualHost, priorities, arguments);
}
else
{
- q = new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost);
+ q = new SimpleAMQQueue(queueName, durable, owner, autoDelete, virtualHost, arguments);
}
//Register the new queue
virtualHost.getQueueRegistry().registerQueue(q);
- q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString()));
+ q.configure(virtualHost.getConfiguration().getQueueConfiguration(queueName));
if(arguments != null)
{
@@ -158,29 +202,43 @@
}
return q;
+
}
+
public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException
{
- AMQShortString queueName = new AMQShortString(config.getName());
+ String queueName = config.getName();
boolean durable = config.getDurable();
boolean autodelete = config.getAutoDelete();
- AMQShortString owner = (config.getOwner() != null) ? new AMQShortString(config.getOwner()) : null;
- FieldTable arguments = null;
- boolean priority = config.getPriority();
- int priorities = config.getPriorities();
- if(priority || priorities > 0)
+ String owner = config.getOwner();
+ Map<String,Object> arguments = null;
+ if(config.isLVQ() || config.getLVQKey() != null)
{
if(arguments == null)
{
- arguments = new FieldTable();
+ arguments = new HashMap<String,Object>();
}
- if (priorities < 0)
+ arguments.put(QPID_LAST_VALUE_QUEUE, 1);
+ arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
+ }
+ else
+ {
+ boolean priority = config.getPriority();
+ int priorities = config.getPriorities();
+ if(priority || priorities > 0)
{
- priorities = 10;
+ if(arguments == null)
+ {
+ arguments = new HashMap<String,Object>();
+ }
+ if (priorities < 0)
+ {
+ priorities = 10;
+ }
+ arguments.put("x-qpid-priorities", priorities);
}
- arguments.put(new AMQShortString("x-qpid-priorities"), priorities);
}
AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, host, arguments);
@@ -188,38 +246,4 @@
return q;
}
- public static AMQQueue createAMQQueueImpl(String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- VirtualHost virtualHost, Map<String, Object> arguments)
- throws AMQException
- {
- int priorities = 1;
- if(arguments != null && arguments.containsKey(X_QPID_PRIORITIES))
- {
- Object prioritiesObj = arguments.get(X_QPID_PRIORITIES);
- if(prioritiesObj instanceof Number)
- {
- priorities = ((Number)prioritiesObj).intValue();
- }
- }
-
-
- AMQQueue q = null;
- if(priorities > 1)
- {
- q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, virtualHost, priorities);
- }
- else
- {
- q = new SimpleAMQQueue(queueName, durable, owner, autoDelete, virtualHost);
- }
-
- //Register the new queue
- virtualHost.getQueueRegistry().registerQueue(q);
- q.configure(virtualHost.getConfiguration().getQueueConfiguration(queueName));
- return q;
-
- }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Sun Jan 31 00:31:49 2010
@@ -95,7 +95,7 @@
{
super(ManagedQueue.class, ManagedQueue.TYPE);
_queue = queue;
- _queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString();
+ _queueName = jmxEncode(new StringBuffer(queue.getNameShortString()), 0).toString();
}
public ManagedObject getParentObject()
@@ -252,7 +252,7 @@
{
throw new IllegalArgumentException("Capacity must not be less than FlowResumeCapacity");
}
-
+
_queue.setCapacity(capacity);
}
@@ -267,10 +267,10 @@
{
throw new IllegalArgumentException("FlowResumeCapacity must not exceed Capacity");
}
-
+
_queue.setFlowResumeCapacity(flowResumeCapacity);
}
-
+
public boolean isFlowOverfull()
{
return _queue.isOverfull();
@@ -309,7 +309,7 @@
public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg)
{
// important : add log to the log file - monitoring tools may be looking for this
- _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
+ _logger.info(notification.name() + " On Queue " + queue.getNameShortString() + " - " + notificationMsg);
notificationMsg = notification.name() + " " + notificationMsg;
_lastNotification =
@@ -509,7 +509,7 @@
private String[] getMessageTransferMessageHeaderProps(MessageTransferMessage msg)
{
List<String> list = new ArrayList<String>();
-
+
AMQMessageHeader header = msg.getMessageHeader();
MessageProperties msgProps = msg.getHeader().get(MessageProperties.class);
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=904934&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Sun Jan 31 00:31:49 2010
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+
+public interface BaseQueue extends TransactionLogResource
+{
+ public static interface PostEnqueueAction
+ {
+ public void onEnqueue(QueueEntry entry);
+ }
+
+ void enqueue(ServerMessage message) throws AMQException;
+ void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException;
+
+ boolean isDurable();
+
+ AMQShortString getNameShortString();
+}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java?rev=904934&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java Sun Jan 31 00:31:49 2010
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.Map;
+
+public class ConflationQueue extends SimpleAMQQueue
+{
+ protected ConflationQueue(String name,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ VirtualHost virtualHost,
+ Map<String, Object> args,
+ String conflationKey)
+ {
+ super(name, durable, owner, autoDelete, virtualHost, new ConflationQueueList.Factory(conflationKey), args);
+ }
+
+
+}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=904934&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Sun Jan 31 00:31:49 2010
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.message.ServerMessage;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ConflationQueueList extends SimpleQueueEntryList
+{
+
+ private final String _conflationKey;
+ private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap =
+ new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>();
+
+ public ConflationQueueList(AMQQueue queue, String conflationKey)
+ {
+ super(queue);
+ _conflationKey = conflationKey;
+ }
+
+ @Override
+ protected ConflationQueueEntry createQueueEntry(ServerMessage message)
+ {
+ return new ConflationQueueEntry(this, message);
+ }
+
+
+ @Override
+ public QueueEntry add(final ServerMessage message)
+ {
+ ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message));
+ AtomicReference<QueueEntry> latestValueReference = null;
+
+ Object value = message.getMessageHeader().getHeader(_conflationKey);
+ if(value != null)
+ {
+ latestValueReference = _latestValuesMap.get(value);
+ if(latestValueReference == null)
+ {
+ _latestValuesMap.putIfAbsent(value, new AtomicReference<QueueEntry>(entry));
+ latestValueReference = _latestValuesMap.get(value);
+ }
+ QueueEntry oldEntry;
+
+ do
+ {
+ oldEntry = latestValueReference.get();
+ }
+ while(oldEntry.compareTo(entry) < 0 && !latestValueReference.compareAndSet(oldEntry, entry));
+
+ if(oldEntry.compareTo(entry) < 0)
+ {
+ // We replaced some other entry to become the newest value
+ if(oldEntry.acquire())
+ {
+ discardEntry(oldEntry);
+ }
+ }
+ else if (oldEntry.compareTo(entry) > 0)
+ {
+ // A newer entry came along
+ discardEntry(entry);
+
+ }
+ }
+
+ entry.setLatestValueReference(latestValueReference);
+ return entry;
+ }
+
+ private void discardEntry(final QueueEntry entry)
+ {
+ if(entry.acquire())
+ {
+ ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+ txn.dequeue(entry.getQueue(),entry.getMessage(),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ entry.discard();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ }
+ }
+
+ private final class ConflationQueueEntry extends QueueEntryImpl
+ {
+
+
+ private AtomicReference<QueueEntry> _latestValueReference;
+
+ public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, ServerMessage message)
+ {
+ super(queueEntryList, message);
+ }
+
+
+ public void release()
+ {
+ super.release();
+
+ if(_latestValueReference != null)
+ {
+ if(_latestValueReference.get() != this)
+ {
+ discardEntry(this);
+ }
+ }
+
+ }
+
+ public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference)
+ {
+ _latestValueReference = latestValueReference;
+ }
+ }
+
+ static class Factory implements QueueEntryListFactory
+ {
+ private final String _conflationKey;
+
+ Factory(String conflationKey)
+ {
+ _conflationKey = conflationKey;
+ }
+
+ public QueueEntryList createQueueEntryList(AMQQueue queue)
+ {
+ return new ConflationQueueList(queue, _conflationKey);
+ }
+ }
+
+}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Sun Jan 31 00:31:49 2010
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -46,7 +45,7 @@
public void registerQueue(AMQQueue queue)
{
- _queueMap.put(queue.getName(), queue);
+ _queueMap.put(queue.getNameShortString(), queue);
}
public void unregisterQueue(AMQShortString name)
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Sun Jan 31 00:31:49 2010
@@ -63,7 +63,7 @@
* delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
* by the message handle.
*/
- private ArrayList<AMQQueue> _destinationQueues;
+ private ArrayList<? extends BaseQueue> _destinationQueues;
private long _expiration;
@@ -131,7 +131,7 @@
}
- public ArrayList<AMQQueue> getDestinationQueues()
+ public ArrayList<? extends BaseQueue> getDestinationQueues()
{
return _destinationQueues;
}
@@ -225,7 +225,7 @@
}
- public void enqueue(final ArrayList<AMQQueue> queues)
+ public void enqueue(final ArrayList<? extends BaseQueue> queues)
{
_destinationQueues = queues;
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Sun Jan 31 00:31:49 2010
@@ -28,6 +28,7 @@
{
+
public static enum State
{
AVAILABLE,
@@ -163,6 +164,8 @@
boolean expired() throws AMQException;
+ boolean isAvailable();
+
boolean isAcquired();
boolean acquire();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sun Jan 31 00:31:49 2010
@@ -20,20 +20,23 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.log4j.Logger;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
public class QueueEntryImpl implements QueueEntry
@@ -154,6 +157,11 @@
}
+ public boolean isAvailable()
+ {
+ return _state == AVAILABLE_STATE;
+ }
+
public boolean isAcquired()
{
return _state.getState() == State.ACQUIRED;
@@ -408,7 +416,7 @@
if(alternateExchange != null)
{
- final List<AMQQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
+ final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
final ServerMessage message = getMessage();
if(rerouteQueues != null && rerouteQueues.size() != 0)
{
@@ -419,9 +427,9 @@
{
try
{
- for(AMQQueue queue : rerouteQueues)
+ for(BaseQueue queue : rerouteQueues)
{
- QueueEntry entry = queue.enqueue(message);
+ queue.enqueue(message);
}
}
catch (AMQException e)
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Sun Jan 31 00:31:49 2010
@@ -1,39 +1,51 @@
package org.apache.qpid.server.queue;
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.management.JMException;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.QueueConfigType;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.QueueActor;
+import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.security.PrincipalHolder;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.QueueActor;
-import org.apache.qpid.server.logging.subjects.QueueLogSubject;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.messages.QueueMessages;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import javax.management.JMException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
/*
*
@@ -81,7 +93,7 @@
private Exchange _alternateExchange;
/** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
- private final ExchangeBindings _bindings = new ExchangeBindings(this);
+
protected final QueueEntryList _entries;
@@ -102,8 +114,15 @@
private final AtomicLong _totalMessagesReceived = new AtomicLong();
+ private final AtomicLong _dequeueCount = new AtomicLong();
+ private final AtomicLong _dequeueSize = new AtomicLong();
+ private final AtomicLong _enqueueSize = new AtomicLong();
+ private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong();
+ private final AtomicLong _persistentMessageDequeueSize = new AtomicLong();
+ private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();;
+ private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
-
+ private final AtomicInteger _bindingCountHigh = new AtomicInteger();
/** max allowed size(KB) of a single message */
public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
@@ -151,18 +170,38 @@
private final AtomicBoolean _overfull = new AtomicBoolean(false);
private boolean _deleteOnNoConsumers;
+ private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
+ private UUID _id;
+ private final Map<String, Object> _arguments;
+
+ //TODO
+ private long _createTime = System.currentTimeMillis();
+
- protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
+ protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost, Map<String,Object> arguments)
{
- this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory());
+ this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory(),arguments);
}
+ public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, VirtualHost virtualHost, Map<String, Object> arguments)
+ {
+ this(queueName, durable, owner,autoDelete,virtualHost,new SimpleQueueEntryList.Factory(),arguments);
+ }
+
+ public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
+ {
+ this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner),autoDelete,virtualHost,entryListFactory, arguments);
+ }
+
+
+
protected SimpleAMQQueue(AMQShortString name,
boolean durable,
AMQShortString owner,
boolean autoDelete,
VirtualHost virtualHost,
- QueueEntryListFactory entryListFactory)
+ QueueEntryListFactory entryListFactory,
+ Map<String,Object> arguments)
{
if (name == null)
@@ -182,6 +221,9 @@
_autoDelete = autoDelete;
_virtualHost = virtualHost;
_entries = entryListFactory.createQueueEntryList(this);
+ _arguments = arguments;
+
+ _id = virtualHost.getConfigStore().createId();
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
@@ -208,6 +250,8 @@
durable, !durable,
priorities > 0));
+ getConfigStore().addConfiguredObject(this);
+
try
{
_managedObject = new AMQQueueMBean(this);
@@ -222,12 +266,6 @@
}
- public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, VirtualHost virtualHost)
- throws AMQException
- {
- this(new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner),autoDelete,virtualHost);
- }
-
public void resetNotifications()
{
// This ensure that the notification checks for the configured alerts are created.
@@ -244,7 +282,7 @@
_asyncDelivery.execute(runnable);
}
- public AMQShortString getName()
+ public AMQShortString getNameShortString()
{
return _name;
}
@@ -254,6 +292,21 @@
_nolocal = nolocal;
}
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public QueueConfigType getConfigType()
+ {
+ return QueueConfigType.getInstance();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return getVirtualHost();
+ }
+
public boolean isDurable()
{
return _durable;
@@ -284,7 +337,7 @@
public Map<String, Object> getArguments()
{
- return null;
+ return _arguments;
}
public boolean isAutoDelete()
@@ -313,56 +366,9 @@
return _virtualHost;
}
- // ------ bind and unbind
-
- public void bind(Exchange exchange, String bindingKey, Map<String, Object> arguments) throws AMQException
- {
-
- FieldTable fieldTable = FieldTable.convertToFieldTable(arguments);
- AMQShortString routingKey = new AMQShortString(bindingKey);
-
- exchange.registerQueue(routingKey, this, fieldTable);
-
- if (isDurable() && exchange.isDurable())
- {
-
- _virtualHost.getDurableConfigurationStore().bindQueue(exchange, routingKey, this, fieldTable);
- }
-
- _bindings.addBinding(routingKey, fieldTable, exchange);
- }
-
-
- public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
- {
-
- exchange.registerQueue(routingKey, this, arguments);
- if (isDurable() && exchange.isDurable())
- {
- _virtualHost.getDurableConfigurationStore().bindQueue(exchange, routingKey, this, arguments);
- }
-
- _bindings.addBinding(routingKey, arguments, exchange);
- }
-
- public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
+ public String getName()
{
- exchange.deregisterQueue(routingKey, this, arguments);
- if (isDurable() && exchange.isDurable())
- {
- _virtualHost.getDurableConfigurationStore().unbindQueue(exchange, routingKey, this, arguments);
- }
-
- boolean removed = _bindings.remove(routingKey, arguments, exchange);
- if (!removed)
- {
- _logger.error("Mismatch between queue bindings and exchange record of bindings");
- }
- }
-
- public List<ExchangeBinding> getExchangeBindings()
- {
- return new ArrayList<ExchangeBinding>(_bindings.getExchangeBindings());
+ return getNameShortString().toString();
}
// ------ Manage Subscriptions
@@ -370,7 +376,7 @@
public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
{
- if (isExclusiveSubscriber())
+ if (hasExclusiveSubscriber())
{
throw new ExistingExclusiveSubscription();
}
@@ -459,17 +465,54 @@
_deleteOnNoConsumers = b;
}
+ public void addBinding(final Binding binding)
+ {
+ _bindings.add(binding);
+ int bindingCount = _bindings.size();
+ int bindingCountHigh;
+ while(bindingCount > (bindingCountHigh = _bindingCountHigh.get()))
+ {
+ if(_bindingCountHigh.compareAndSet(bindingCountHigh, bindingCount))
+ {
+ break;
+ }
+ }
+ }
+
+ public int getBindingCountHigh()
+ {
+ return _bindingCountHigh.get();
+ }
+
+ public void removeBinding(final Binding binding)
+ {
+ _bindings.remove(binding);
+ }
+
+ public List<Binding> getBindings()
+ {
+ return Collections.unmodifiableList(_bindings);
+ }
+
+ public int getBindingCount()
+ {
+ return getBindings().size();
+ }
// ------ Enqueue / Dequeue
+ public void enqueue(ServerMessage message) throws AMQException
+ {
+ enqueue(message, null);
+ }
- public QueueEntry enqueue(ServerMessage message) throws AMQException
+ public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
{
incrementQueueCount();
incrementQueueSize(message);
-
_totalMessagesReceived.incrementAndGet();
+
QueueEntry entry;
Subscription exclusiveSub = _exclusiveSubscriber;
@@ -554,7 +597,11 @@
_managedObject.checkForNotification(entry.getMessage());
}
- return entry;
+ if(action != null)
+ {
+ action.onEnqueue(entry);
+ }
+
}
private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
@@ -596,7 +643,14 @@
private void incrementQueueSize(final ServerMessage message)
{
- getAtomicQueueSize().addAndGet(message.getSize());
+ long size = message.getSize();
+ getAtomicQueueSize().addAndGet(size);
+ _enqueueSize.addAndGet(size);
+ if(message.isPersistent() && isDurable())
+ {
+ _persistentMessageEnqueueSize.addAndGet(size);
+ _persistentMessageEnqueueCount.incrementAndGet();
+ }
}
private void incrementQueueCount()
@@ -654,7 +708,7 @@
SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
- while (subscriberIter.advance())
+ while (subscriberIter.advance() && entry.isAvailable())
{
Subscription sub = subscriberIter.getNode().getSubscription();
@@ -702,12 +756,21 @@
private void decrementQueueSize(final QueueEntry entry)
{
- getAtomicQueueSize().addAndGet(-entry.getMessage().getSize());
+ final ServerMessage message = entry.getMessage();
+ long size = message.getSize();
+ getAtomicQueueSize().addAndGet(-size);
+ _dequeueSize.addAndGet(size);
+ if(message.isPersistent() && isDurable())
+ {
+ _persistentMessageDequeueSize.addAndGet(size);
+ _persistentMessageDequeueCount.incrementAndGet();
+ }
}
void decrementQueueCount()
{
getAtomicQueueCount().decrementAndGet();
+ _dequeueCount.incrementAndGet();
}
public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
@@ -834,7 +897,7 @@
public int compareTo(final AMQQueue o)
{
- return _name.compareTo(o.getName());
+ return _name.compareTo(o.getNameShortString());
}
public AtomicInteger getAtomicQueueCount()
@@ -847,7 +910,7 @@
return _atomicQueueSize;
}
- private boolean isExclusiveSubscriber()
+ public boolean hasExclusiveSubscriber()
{
return _exclusiveSubscriber != null;
}
@@ -1099,6 +1162,45 @@
}
+ public void purge(final long request)
+ {
+ if(request == 0l)
+ {
+ clearQueue();
+ }
+ else if(request > 0l)
+ {
+
+ QueueEntryIterator queueListIterator = _entries.iterator();
+ long count = 0;
+
+ ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
+
+ while (queueListIterator.advance())
+ {
+ QueueEntry node = queueListIterator.getNode();
+ if (!node.isDeleted() && node.acquire())
+ {
+ dequeueEntry(node, txn);
+ if(++count == request)
+ {
+ break;
+ }
+ }
+
+ }
+
+ txn.commit();
+
+
+ }
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
// ------ Management functions
public void deleteMessageFromTop()
@@ -1172,11 +1274,21 @@
_deleteTaskList.add(task);
}
+ public void removeQueueDeleteTask(final Task task)
+ {
+ _deleteTaskList.remove(task);
+ }
+
public int delete() throws AMQException
{
if (!_deleted.getAndSet(true))
{
+ for(Binding b : getBindings())
+ {
+ _virtualHost.getBindingFactory().removeBinding(b);
+ }
+
SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
while (subscriptionIter.advance())
@@ -1188,8 +1300,8 @@
}
}
- _bindings.deregister();
_virtualHost.getQueueRegistry().unregisterQueue(_name);
+ getConfigStore().removeConfiguredObject(this);
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -1214,7 +1326,7 @@
for(final QueueEntry entry : entries)
{
adapter.setEntry(entry);
- final List<AMQQueue> rerouteQueues = _alternateExchange.route(adapter);
+ final List<? extends BaseQueue> rerouteQueues = _alternateExchange.route(adapter);
final ServerMessage message = entry.getMessage();
if(rerouteQueues != null & rerouteQueues.size() != 0)
{
@@ -1226,9 +1338,9 @@
{
try
{
- for(AMQQueue queue : rerouteQueues)
+ for(BaseQueue queue : rerouteQueues)
{
- QueueEntry entry = queue.enqueue(message);
+ queue.enqueue(message);
}
}
catch (AMQException e)
@@ -1479,7 +1591,7 @@
// next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
// which would give us memory "leak".
- if (!isExclusiveSubscriber())
+ if (!hasExclusiveSubscriber())
{
advanceAllSubscriptions();
}
@@ -1820,7 +1932,7 @@
public void setFlowResumeCapacity(long flowResumeCapacity)
{
_flowResumeCapacity = flowResumeCapacity;
-
+
checkCapacity();
}
@@ -1919,9 +2031,50 @@
}
+ public ConfigStore getConfigStore()
+ {
+ return getVirtualHost().getConfigStore();
+ }
+
+ public long getMessageDequeueCount()
+ {
+ return _dequeueCount.get();
+ }
+
+ public long getTotalEnqueueSize()
+ {
+ return _enqueueSize.get();
+ }
+
+ public long getTotalDequeueSize()
+ {
+ return _dequeueSize.get();
+ }
+
+ public long getPersistentByteEnqueues()
+ {
+ return _persistentMessageEnqueueSize.get();
+ }
+
+ public long getPersistentByteDequeues()
+ {
+ return _persistentMessageDequeueSize.get();
+ }
+
+ public long getPersistentMsgEnqueues()
+ {
+ return _persistentMessageEnqueueCount.get();
+ }
+
+ public long getPersistentMsgDequeues()
+ {
+ return _persistentMessageDequeueCount.get();
+ }
+
+
@Override
public String toString()
{
- return String.valueOf(getName());
+ return String.valueOf(getNameShortString());
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Sun Jan 31 00:31:49 2010
@@ -61,11 +61,9 @@
{
_deletes.incrementAndGet();
QueueEntryImpl head = _head.nextNode();
- boolean deleted = head.isDeleted();
while(head._next != null && head.isDeleted())
{
- deleted = true;
final QueueEntryImpl newhead = head.nextNode();
if(newhead != null)
{
@@ -77,11 +75,6 @@
head = _head.nextNode();
}
- if(!deleted)
- {
- deleted = true;
- }
-
if(_deletes.get() > 1000L)
{
_deletes.set(0L);
@@ -135,7 +128,7 @@
public QueueEntry add(ServerMessage message)
{
- QueueEntryImpl node = new QueueEntryImpl(this, message);
+ QueueEntryImpl node = createQueueEntry(message);
for (;;)
{
QueueEntryImpl tail = _tail;
@@ -160,6 +153,11 @@
}
}
+ protected QueueEntryImpl createQueueEntry(ServerMessage message)
+ {
+ return new QueueEntryImpl(this, message);
+ }
+
public QueueEntry next(QueueEntry node)
{
return ((QueueEntryImpl)node).getNext();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Sun Jan 31 00:31:49 2010
@@ -20,24 +20,33 @@
*/
package org.apache.qpid.server.registry;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
+
+import org.apache.qpid.qmf.QMFService;
+import org.apache.qpid.server.configuration.BrokerConfig;
+import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.configuration.SystemConfig;
+import org.apache.qpid.server.configuration.SystemConfigImpl;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.transport.QpidAcceptor;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.transport.QpidAcceptor;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
/**
* An abstract application registry that provides access to configuration information and handles the
@@ -75,6 +84,14 @@
protected RootMessageLogger _rootMessageLogger;
+ protected UUID _brokerId = UUID.randomUUID();
+
+ protected QMFService _qmfService;
+
+ private BrokerConfig _broker;
+
+ private ConfigStore _configStore;
+
static
{
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
@@ -100,6 +117,16 @@
_logger.info("Initialising Application Registry:" + instanceID);
_instanceMap.put(instanceID, instance);
+ final ConfigStore store = ConfigStore.newInstance();
+ store.setRoot(new SystemConfigImpl(store));
+ instance.setConfigStore(store);
+
+ BrokerConfig broker = new BrokerConfigAdapter(instance, instanceID);
+
+ SystemConfig system = (SystemConfig) store.getRoot();
+ system.addBroker(broker);
+ instance.setBroker(broker);
+
try
{
instance.initialise(instanceID);
@@ -107,7 +134,14 @@
catch (Exception e)
{
_instanceMap.remove(instanceID);
- throw e;
+ try
+ {
+ system.removeBroker(broker);
+ }
+ finally
+ {
+ throw e;
+ }
}
}
else
@@ -116,6 +150,16 @@
}
}
+ public ConfigStore getConfigStore()
+ {
+ return _configStore;
+ }
+
+ public void setConfigStore(final ConfigStore configStore)
+ {
+ _configStore = configStore;
+ }
+
public static boolean isConfigured()
{
return isConfigured(DEFAULT_INSTANCE);
@@ -151,6 +195,7 @@
_logger.info("Shuting down ApplicationRegistry(" + instanceID + "):" + instance);
}
instance.close();
+ instance.getBroker().getSystem().removeBroker(instance.getBroker());
}
}
catch (Exception e)
@@ -316,5 +361,32 @@
{
return _rootMessageLogger;
}
-
+
+ public UUID getBrokerId()
+ {
+ return _brokerId;
+ }
+
+ public QMFService getQMFService()
+ {
+ return _qmfService;
+ }
+
+ public BrokerConfig getBroker()
+ {
+ return _broker;
+ }
+
+ public void setBroker(final BrokerConfig broker)
+ {
+ _broker = broker;
+ }
+
+ public VirtualHost createVirtualHost(final VirtualHostConfiguration vhostConfig) throws Exception
+ {
+ VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig);
+ _virtualHostRegistry.registerVirtualHost(virtualHost);
+ getBroker().addVirtualHost(virtualHost);
+ return virtualHost;
+ }
}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java?rev=904934&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java Sun Jan 31 00:31:49 2010
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.registry;
+
+import org.apache.qpid.server.configuration.*;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.common.QpidProperties;
+
+import java.util.UUID;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class BrokerConfigAdapter implements BrokerConfig
+{
+ private final IApplicationRegistry _instance;
+ private final int _instanceId;
+ private SystemConfig _system;
+
+ private final Map<UUID, VirtualHostConfig> _vhosts = new ConcurrentHashMap<UUID, VirtualHostConfig>();
+ private final long _createTime = System.currentTimeMillis();
+ private UUID _id;
+ private String _federationTag;
+
+ public BrokerConfigAdapter(final IApplicationRegistry instance, final int instanceID)
+ {
+ _instance = instance;
+ _instanceId = instanceID;
+ _id = instance.getConfigStore().createId();
+ _federationTag = UUID.randomUUID().toString();
+ }
+
+ public void setSystem(final SystemConfig system)
+ {
+ _system = system;
+ }
+
+ public SystemConfig getSystem()
+ {
+ return _system;
+ }
+
+ public Integer getPort()
+ {
+ List ports = _instance.getConfiguration().getPorts();
+ if(ports.size() > 0)
+ {
+ return Integer.valueOf(ports.get(0).toString());
+ }
+ else
+ {
+ return 0;
+ }
+ }
+
+ public Integer getWorkerThreads()
+ {
+ return _instance.getConfiguration().getProcessors();
+ }
+
+ public Integer getMaxConnections()
+ {
+ return 0;
+ }
+
+ public Integer getConnectionBacklogLimit()
+ {
+ return 0;
+ }
+
+ public Long getStagingThreshold()
+ {
+ return 0L;
+ }
+
+ public Integer getManagementPublishInterval()
+ {
+ return 10;
+ }
+
+ public String getVersion()
+ {
+ return QpidProperties.getReleaseVersion() + " [Build: " + QpidProperties.getBuildVersion() + "]";
+ }
+
+ public String getDataDirectory()
+ {
+ return _instance.getConfiguration().getQpidWork();
+ }
+
+ public void addVirtualHost(final VirtualHostConfig virtualHost)
+ {
+ virtualHost.setBroker(this);
+ _vhosts.put(virtualHost.getId(), virtualHost);
+ getConfigStore().addConfiguredObject(virtualHost);
+
+ }
+
+ private ConfigStore getConfigStore()
+ {
+ return _instance.getConfigStore();
+ }
+
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
+
+ public void createBrokerConnection(final String transport,
+ final String host,
+ final int port,
+ final boolean durable,
+ final String authMechanism,
+ final String username,
+ final String password)
+ {
+ VirtualHost vhost = _instance.getVirtualHostRegistry().getDefaultVirtualHost();
+ vhost.createBrokerConnection(transport, host, port, "", durable, authMechanism, username, password);
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public BrokerConfigType getConfigType()
+ {
+ return BrokerConfigType.getInstance();
+ }
+
+ public ConfiguredObject getParent()
+ {
+ return _system;
+ }
+
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ public String getFederationTag()
+ {
+ return _federationTag;
+ }
+}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Sun Jan 31 00:31:49 2010
@@ -21,13 +21,15 @@
package org.apache.qpid.server.registry;
import org.apache.commons.configuration.ConfigurationException;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.qmf.QMFService;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.logging.RootMessageLoggerImpl;
-import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
import org.apache.qpid.server.management.JMXManagedObjectRegistry;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
@@ -36,7 +38,6 @@
import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import java.io.File;
@@ -51,6 +52,9 @@
public void initialise(int instanceID) throws Exception
{
+ _qmfService = new QMFService(getConfigStore(), this);
+
+
_rootMessageLogger = new RootMessageLoggerImpl(_configuration,
new Log4jMessageLogger());
@@ -75,6 +79,7 @@
_databaseManager.initialiseManagement(_configuration);
+
_managedObjectRegistry.start();
initialiseVirtualHosts();
@@ -91,6 +96,7 @@
try
{
super.close();
+ _qmfService.close();
}
finally
{
@@ -102,7 +108,7 @@
{
for (String name : _configuration.getVirtualHosts())
{
- _virtualHostRegistry.registerVirtualHost(new VirtualHostImpl(_configuration.getVirtualHostConfig(name)));
+ createVirtualHost(_configuration.getVirtualHostConfig(name));
}
getVirtualHostRegistry().setDefaultVirtualHostName(_configuration.getDefaultVirtualHost());
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Sun Jan 31 00:31:49 2010
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,22 +20,25 @@
*/
package org.apache.qpid.server.registry;
-import java.util.Collection;
-import java.net.InetSocketAddress;
-
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
+
+import org.apache.qpid.qmf.QMFService;
+import org.apache.qpid.server.configuration.BrokerConfig;
+import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.plugins.PluginManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.access.ACLManager;
-import org.apache.qpid.server.security.access.ACLPlugin;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.transport.QpidAcceptor;
-import org.apache.mina.common.IoAcceptor;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+
+import java.net.InetSocketAddress;
+import java.util.UUID;
public interface IApplicationRegistry
{
@@ -81,4 +84,17 @@
*/
void addAcceptor(InetSocketAddress bindAddress, QpidAcceptor acceptor);
+ public UUID getBrokerId();
+
+ QMFService getQMFService();
+
+ void setBroker(BrokerConfig broker);
+
+ BrokerConfig getBroker();
+
+ VirtualHost createVirtualHost(VirtualHostConfiguration vhostConfig) throws Exception;
+
+ ConfigStore getConfigStore();
+
+ void setConfigStore(ConfigStore store);
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org