You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2010/01/31 01:31:57 UTC

svn commit: r904934 [7/11] - in /qpid/trunk/qpid/java: broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/ broker/src/main/java/org/apache/qpid/qmf/ br...

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Sun Jan 31 00:31:49 2010
@@ -20,50 +20,16 @@
  */
 package org.apache.qpid.server.protocol;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicMarkableReference;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.management.JMException;
-import javax.security.sasl.SaslServer;
-
 import org.apache.log4j.Logger;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
+
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.common.ClientProperties;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQProtocolHeaderException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.pool.Job;
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.protocol.AMQConstant;
@@ -71,6 +37,10 @@
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.ConnectionConfig;
+import org.apache.qpid.server.configuration.ConnectionConfigType;
 import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
@@ -85,12 +55,31 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.transport.Sender;
 
-public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession
+import javax.management.JMException;
+import javax.security.sasl.SaslServer;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
 
@@ -161,6 +150,8 @@
     private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
     private long _maxFrameSize;
     private final AtomicBoolean _closing = new AtomicBoolean(false);
+    private final UUID _id;
+    private final ConfigStore _configStore;
 
     public ManagedObject getManagedObject()
     {
@@ -181,6 +172,10 @@
 
         _logSubject = new ConnectionLogSubject(this);
 
+        _configStore = virtualHostRegistry.getConfigStore();
+        _id = _configStore.createId();
+
+
         _actor.message(ConnectionMessages.CON_OPEN(null, null, false, false));
 
     }
@@ -765,6 +760,8 @@
 
     public void closeProtocolSession()
     {
+        getConfigStore().removeConfiguredObject(this);
+
         _networkDriver.close();
         try
         {
@@ -902,6 +899,8 @@
         _virtualHost = virtualHost;
 
         _virtualHost.getConnectionRegistry().registerConnection(this);
+        
+        _configStore.addConfiguredObject(this);
 
         try
         {
@@ -1067,4 +1066,69 @@
         }
     }
 
+    public Boolean isIncoming()
+    {
+        return true;
+    }
+
+    public Boolean isSystemConnection()
+    {
+        return false;
+    }
+
+    public Boolean isFederationLink()
+    {
+        return false;
+    }
+
+    public String getAuthId()
+    {
+        return getAuthorizedID().getName();
+    }
+
+    public Integer getRemotePID()
+    {
+        return null;
+    }
+
+    public String getRemoteProcessName()
+    {
+        return null;
+    }
+
+    public Integer getRemoteParentPID()
+    {
+        return null;
+    }
+
+    public ConfigStore getConfigStore()
+    {
+        return _configStore;
+    }
+
+    public ConnectionConfigType getConfigType()
+    {
+        return ConnectionConfigType.getInstance();
+    }
+
+    public ConfiguredObject getParent()
+    {
+        return getVirtualHost();
+    }
+
+    public boolean isDurable()
+    {
+        return false;
+    }
+
+    public UUID getId()
+    {
+        return _id;
+    }
+
+    public String getAddress()
+    {
+        return String.valueOf(getRemoteAddress());
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Sun Jan 31 00:31:49 2010
@@ -37,8 +37,19 @@
  */
 package org.apache.qpid.server.protocol;
 
-import java.util.Date;
-import java.util.List;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.ManagedObject;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -55,22 +66,8 @@
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
-import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.ManagementActor;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
+import java.util.Date;
+import java.util.List;
 
 /**
  * This MBean class implements the management interface. In order to make more attributes, operations and notifications
@@ -255,7 +252,7 @@
             Object[] itemValues =
                 {
                     channel.getChannelId(), channel.isTransactional(),
-                    (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName().asString() : null,
+                    (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getNameShortString().asString() : null,
                     channel.getUnacknowledgedMessageMap().size(), channel.getBlocking()
                 };
 
@@ -291,7 +288,7 @@
         // then the CurrentActor could be set in our JMX Proxy object.
         // As it is we need to set the CurrentActor on all MBean methods
         // Ideally we would not have a single method that can be called from
-        // two contexts.        
+        // two contexts.
         boolean removeActor = false;
         if (CurrentActor.get() == null)
         {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Sun Jan 31 00:31:49 2010
@@ -20,14 +20,14 @@
 */
 package org.apache.qpid.server.protocol;
 
+import org.apache.log4j.Logger;
+
 import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionDelegate;
 import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION;
+import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.transport.ServerConnection;
-import org.apache.log4j.Logger;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.NetworkDriver;
 
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -239,10 +239,10 @@
             final ConnectionDelegate connDelegate =
                     new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn);
 
-            Connection conn = new ServerConnection();
+            ServerConnection conn = new ServerConnection();
             conn.setConnectionDelegate(connDelegate);
 
-            return new ProtocolEngine_0_10( conn, _networkDriver);
+            return new ProtocolEngine_0_10( conn, _networkDriver, _appRegistry);
         }
     };
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Sun Jan 31 00:31:49 2010
@@ -26,23 +26,34 @@
 import org.apache.qpid.transport.network.InputHandler;
 import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.server.configuration.*;
+import org.apache.qpid.server.transport.ServerConnection;
+import org.apache.qpid.server.registry.IApplicationRegistry;
 
 import java.net.SocketAddress;
+import java.util.UUID;
 
-public class ProtocolEngine_0_10  extends InputHandler implements ProtocolEngine
+public class ProtocolEngine_0_10  extends InputHandler implements ProtocolEngine, ConnectionConfig
 {
     public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
 
     private NetworkDriver _networkDriver;
     private long _readBytes;
     private long _writtenBytes;
-    private Connection _connection;
-
-    public ProtocolEngine_0_10(Connection conn, NetworkDriver networkDriver)
+    private ServerConnection _connection;
+    private final UUID _id;
+    private final IApplicationRegistry _appRegistry;
+
+    public ProtocolEngine_0_10(ServerConnection conn,
+                               NetworkDriver networkDriver,
+                               final IApplicationRegistry appRegistry)
     {
         super(new Assembler(conn));
         _connection = conn;
+        _connection.setConnectionConfig(this);
         _networkDriver = networkDriver;
+        _id = appRegistry.getConfigStore().createId();
+        _appRegistry = appRegistry;
     }
 
     public void setNetworkDriver(NetworkDriver driver)
@@ -50,6 +61,14 @@
         _networkDriver = driver;
         Disassembler dis = new Disassembler(driver, MAX_FRAME_SIZE);
         _connection.setSender(dis);
+        _connection.onOpen(new Runnable()
+        {
+            public void run()
+            {
+                getConfigStore().addConfiguredObject(ProtocolEngine_0_10.this);
+            }
+        });
+
     }
 
     public SocketAddress getRemoteAddress()
@@ -81,4 +100,81 @@
     {
         //Todo
     }
+
+    public VirtualHostConfig getVirtualHost()
+    {
+        return _connection.getVirtualHost();
+    }
+
+    public String getAddress()
+    {
+        return getRemoteAddress().toString();
+    }
+
+    public Boolean isIncoming()
+    {
+        return true;
+    }
+
+    public Boolean isSystemConnection()
+    {
+        return false;
+    }
+
+    public Boolean isFederationLink()
+    {
+        return false;
+    }
+
+    public String getAuthId()
+    {
+        return _connection.getAuthorizationID();
+    }
+
+    public String getRemoteProcessName()
+    {
+        return null;
+    }
+
+    public Integer getRemotePID()
+    {
+        return null;
+    }
+
+    public Integer getRemoteParentPID()
+    {
+        return null;
+    }
+
+    public ConfigStore getConfigStore()
+    {
+        return _appRegistry.getConfigStore();
+    }
+
+    public UUID getId()
+    {
+        return _id;
+    }
+
+    public ConnectionConfigType getConfigType()
+    {
+        return ConnectionConfigType.getInstance();
+    }
+
+    public ConfiguredObject getParent()
+    {
+        return getVirtualHost();
+    }
+
+    public boolean isDurable()
+    {
+        return false;
+    }
+
+    @Override
+    public void closed()
+    {
+        super.closed();
+        getConfigStore().removeConfiguredObject(this);
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Sun Jan 31 00:31:49 2010
@@ -21,9 +21,11 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.Map;
 
 public class AMQPriorityQueue extends SimpleAMQQueue
 {
@@ -32,18 +34,18 @@
                                final AMQShortString owner,
                                final boolean autoDelete,
                                final VirtualHost virtualHost,
-                               int priorities)
+                               int priorities, Map<String, Object> arguments)
     {
-        super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities));
+        super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities),arguments);
     }
 
     public AMQPriorityQueue(String queueName,
                             boolean durable,
                             String owner,
                             boolean autoDelete,
-                            VirtualHost virtualHost, int priorities)
+                            VirtualHost virtualHost, int priorities, Map<String,Object> arguments)
     {
-        this(new AMQShortString(queueName), durable, new AMQShortString(owner),autoDelete,virtualHost,priorities);
+        this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner),autoDelete,virtualHost,priorities, arguments);
     }
 
     public int getPriorities()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Sun Jan 31 00:31:49 2010
@@ -20,44 +20,48 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.configuration.QueueConfig;
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeReferrer;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.management.Managable;
+import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.List;
-import java.util.Set;
 import java.util.Map;
+import java.util.Set;
 
-public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource
+public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue,
+                                  QueueConfig
 {
     boolean getDeleteOnNoConsumers();
 
     void setDeleteOnNoConsumers(boolean b);
 
+    void addBinding(Binding binding);
+
+    void removeBinding(Binding binding);
+
+    List<Binding> getBindings();
+
+    int getBindingCount();
 
     public interface Context
     {
         QueueEntry getLastSeenEntry();
     }
 
-    AMQShortString getName();
-
     void setNoLocal(boolean b);
 
-    boolean isDurable();
-
     boolean isAutoDelete();
 
     AMQShortString getOwner();
@@ -69,14 +73,6 @@
 
     VirtualHost getVirtualHost();
 
-
-    void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException;
-
-    void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException;
-
-    List<ExchangeBinding> getExchangeBindings();
-
-
     void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException;
 
     void unregisterSubscription(final Subscription subscription) throws AMQException;
@@ -86,6 +82,8 @@
 
     int getActiveConsumerCount();
 
+    boolean hasExclusiveSubscriber();
+
     boolean isUnused();
 
     boolean isEmpty();
@@ -107,8 +105,6 @@
     int delete() throws AMQException;
 
 
-    QueueEntry enqueue(ServerMessage message) throws AMQException;
-
     void requeue(QueueEntry entry);
 
     void requeue(QueueEntryImpl storeContext, Subscription subscription);
@@ -122,6 +118,8 @@
 
 
     void addQueueDeleteTask(final Task task);
+    void removeQueueDeleteTask(final Task task);
+
 
 
     List<QueueEntry> getMessagesOnTheQueue();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Sun Jan 31 00:31:49 2010
@@ -23,16 +23,21 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.configuration.QueueConfiguration;
 
 import java.util.Map;
+import java.util.HashMap;
 
 
 public class AMQQueueFactory
 {
     public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
 
+    private static final String QPID_LVQ_KEY = "qpid.LVQ_key";
+    private static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
+    private static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
+
     private abstract static class QueueProperty
     {
 
@@ -130,21 +135,60 @@
                                               boolean autoDelete,
                                               VirtualHost virtualHost, final FieldTable arguments)
     {
-        final int priorities = arguments == null ? 1 : arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1;
+        return createAMQQueueImpl(name == null ? null : name.toString(),
+                                  durable,
+                                  owner == null ? null : owner.toString(),
+                                  autoDelete,
+                                  virtualHost,
+                                  FieldTable.convertToMap(arguments));
+    }
 
-        AMQQueue q = null;
-        if(priorities > 1)
+
+    public static AMQQueue createAMQQueueImpl(String queueName,
+                                              boolean durable,
+                                              String owner,
+                                              boolean autoDelete,
+                                              VirtualHost virtualHost, Map<String, Object> arguments)
+    {
+        int priorities = 1;
+        String conflationKey = null;
+        if(arguments != null)
         {
-            q = new AMQPriorityQueue(name, durable, owner, autoDelete, virtualHost, priorities);
+            if(arguments.containsKey(QPID_LAST_VALUE_QUEUE) || arguments.containsKey(QPID_LAST_VALUE_QUEUE_KEY))
+            {
+                conflationKey = (String) arguments.get(QPID_LAST_VALUE_QUEUE_KEY);
+                if(conflationKey == null)
+                {
+                    conflationKey = QPID_LVQ_KEY;
+                }
+            }
+            else if(arguments.containsKey(X_QPID_PRIORITIES))
+            {
+                Object prioritiesObj = arguments.get(X_QPID_PRIORITIES);
+                if(prioritiesObj instanceof Number)
+                {
+                    priorities = ((Number)prioritiesObj).intValue();
+                }
+            }
+        }
+
+        AMQQueue q;
+        if(conflationKey != null)
+        {
+            q = new ConflationQueue(queueName, durable, owner, autoDelete, virtualHost, arguments, conflationKey);
+        }
+        else if(priorities > 1)
+        {
+            q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, virtualHost, priorities, arguments);
         }
         else
         {
-            q = new SimpleAMQQueue(name, durable, owner, autoDelete, virtualHost);
+            q = new SimpleAMQQueue(queueName, durable, owner, autoDelete, virtualHost, arguments);
         }
 
         //Register the new queue
         virtualHost.getQueueRegistry().registerQueue(q);
-        q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString()));
+        q.configure(virtualHost.getConfiguration().getQueueConfiguration(queueName));
 
         if(arguments != null)
         {
@@ -158,29 +202,43 @@
         }
 
         return q;
+
     }
 
+
     public static AMQQueue createAMQQueueImpl(QueueConfiguration config, VirtualHost host) throws AMQException
     {
-        AMQShortString queueName = new AMQShortString(config.getName());
+        String queueName = config.getName();
 
         boolean durable = config.getDurable();
         boolean autodelete = config.getAutoDelete();
-        AMQShortString owner = (config.getOwner() != null) ? new AMQShortString(config.getOwner()) : null;
-        FieldTable arguments = null;
-        boolean priority = config.getPriority();
-        int priorities = config.getPriorities();
-        if(priority || priorities > 0)
+        String owner = config.getOwner();
+        Map<String,Object> arguments = null;
+        if(config.isLVQ() || config.getLVQKey() != null)
         {
             if(arguments == null)
             {
-                arguments = new FieldTable();
+                arguments = new HashMap<String,Object>();
             }
-            if (priorities < 0)
+            arguments.put(QPID_LAST_VALUE_QUEUE, 1);
+            arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
+        }
+        else
+        {
+            boolean priority = config.getPriority();
+            int priorities = config.getPriorities();
+            if(priority || priorities > 0)
             {
-                priorities = 10;
+                if(arguments == null)
+                {
+                    arguments = new HashMap<String,Object>();
+                }
+                if (priorities < 0)
+                {
+                    priorities = 10;
+                }
+                arguments.put("x-qpid-priorities", priorities);
             }
-            arguments.put(new AMQShortString("x-qpid-priorities"), priorities);
         }
 
         AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, host, arguments);
@@ -188,38 +246,4 @@
         return q;
     }
 
-    public static AMQQueue createAMQQueueImpl(String queueName,
-                                              boolean durable,
-                                              String owner,
-                                              boolean autoDelete,
-                                              VirtualHost virtualHost, Map<String, Object> arguments)
-            throws AMQException
-    {
-        int priorities = 1;
-        if(arguments != null && arguments.containsKey(X_QPID_PRIORITIES))
-        {
-            Object prioritiesObj = arguments.get(X_QPID_PRIORITIES);
-            if(prioritiesObj instanceof Number)
-            {
-                priorities = ((Number)prioritiesObj).intValue();
-            }
-        }
-
-
-        AMQQueue q = null;
-        if(priorities > 1)
-        {
-            q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, virtualHost, priorities);
-        }
-        else
-        {
-            q = new SimpleAMQQueue(queueName, durable, owner, autoDelete, virtualHost);
-        }
-
-        //Register the new queue
-        virtualHost.getQueueRegistry().registerQueue(q);
-        q.configure(virtualHost.getConfiguration().getQueueConfiguration(queueName));
-        return q;
-
-    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Sun Jan 31 00:31:49 2010
@@ -95,7 +95,7 @@
     {
         super(ManagedQueue.class, ManagedQueue.TYPE);
         _queue = queue;
-        _queueName = jmxEncode(new StringBuffer(queue.getName()), 0).toString();
+        _queueName = jmxEncode(new StringBuffer(queue.getNameShortString()), 0).toString();
     }
 
     public ManagedObject getParentObject()
@@ -252,7 +252,7 @@
         {
             throw new IllegalArgumentException("Capacity must not be less than FlowResumeCapacity");
         }
-        
+
     	_queue.setCapacity(capacity);
     }
 
@@ -267,10 +267,10 @@
         {
             throw new IllegalArgumentException("FlowResumeCapacity must not exceed Capacity");
         }
-        
+
         _queue.setFlowResumeCapacity(flowResumeCapacity);
     }
-    
+
     public boolean isFlowOverfull()
     {
         return _queue.isOverfull();
@@ -309,7 +309,7 @@
     public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg)
     {
         // important : add log to the log file - monitoring tools may be looking for this
-        _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
+        _logger.info(notification.name() + " On Queue " + queue.getNameShortString() + " - " + notificationMsg);
         notificationMsg = notification.name() + " " + notificationMsg;
 
         _lastNotification =
@@ -509,7 +509,7 @@
     private String[] getMessageTransferMessageHeaderProps(MessageTransferMessage msg)
     {
         List<String> list = new ArrayList<String>();
-        
+
         AMQMessageHeader header = msg.getMessageHeader();
         MessageProperties msgProps = msg.getHeader().get(MessageProperties.class);
 

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=904934&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Sun Jan 31 00:31:49 2010
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+
+public interface BaseQueue extends TransactionLogResource
+{
+    public static interface PostEnqueueAction
+    {
+        public void onEnqueue(QueueEntry entry);
+    }
+
+    void enqueue(ServerMessage message) throws AMQException;
+    void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException;
+
+    boolean isDurable();
+
+    AMQShortString getNameShortString();
+}

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java?rev=904934&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java Sun Jan 31 00:31:49 2010
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.Map;
+
+public class ConflationQueue extends SimpleAMQQueue
+{
+    protected ConflationQueue(String name,
+                              boolean durable,
+                              String owner,
+                              boolean autoDelete,
+                              VirtualHost virtualHost,
+                              Map<String, Object> args,
+                              String conflationKey)
+    {
+        super(name, durable, owner, autoDelete, virtualHost, new ConflationQueueList.Factory(conflationKey), args);
+    }
+
+
+}

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=904934&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Sun Jan 31 00:31:49 2010
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.message.ServerMessage;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ConflationQueueList extends SimpleQueueEntryList
+{
+
+    private final String _conflationKey;
+    private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap =
+        new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>();
+
+    public ConflationQueueList(AMQQueue queue, String conflationKey)
+    {
+        super(queue);
+        _conflationKey = conflationKey;
+    }
+
+    @Override
+    protected ConflationQueueEntry createQueueEntry(ServerMessage message)
+    {
+        return new ConflationQueueEntry(this, message);
+    }
+
+
+    @Override
+    public QueueEntry add(final ServerMessage message)
+    {
+        ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message));
+        AtomicReference<QueueEntry> latestValueReference = null;
+
+        Object value = message.getMessageHeader().getHeader(_conflationKey);
+        if(value != null)
+        {
+            latestValueReference = _latestValuesMap.get(value);
+            if(latestValueReference == null)
+            {
+                _latestValuesMap.putIfAbsent(value, new AtomicReference<QueueEntry>(entry));
+                latestValueReference = _latestValuesMap.get(value);
+            }
+            QueueEntry oldEntry;
+
+            do
+            {
+                oldEntry = latestValueReference.get();
+            }
+            while(oldEntry.compareTo(entry) < 0 && !latestValueReference.compareAndSet(oldEntry, entry));
+
+            if(oldEntry.compareTo(entry) < 0)
+            {
+                // We replaced some other entry to become the newest value
+                if(oldEntry.acquire())
+                {
+                    discardEntry(oldEntry);
+                }
+            }
+            else if (oldEntry.compareTo(entry) > 0)
+            {
+                // A newer entry came along
+                discardEntry(entry);
+
+            }
+        }
+
+        entry.setLatestValueReference(latestValueReference);
+        return entry;
+    }
+
+    private void discardEntry(final QueueEntry entry)
+    {
+        if(entry.acquire())
+        {
+            ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+            txn.dequeue(entry.getQueue(),entry.getMessage(),
+                                    new ServerTransaction.Action()
+                                {
+                                    public void postCommit()
+                                    {
+                                        entry.discard();
+                                    }
+
+                                    public void onRollback()
+                                    {
+
+                                    }
+                                });
+        }
+    }
+
+    private final class ConflationQueueEntry extends QueueEntryImpl
+    {
+
+
+        private AtomicReference<QueueEntry> _latestValueReference;
+
+        public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, ServerMessage message)
+        {
+            super(queueEntryList, message);
+        }
+
+
+        public void release()
+        {
+            super.release();
+
+            if(_latestValueReference != null)
+            {
+                if(_latestValueReference.get() != this)
+                {
+                    discardEntry(this);
+                }
+            }
+
+        }
+
+        public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference)
+        {
+            _latestValueReference = latestValueReference;
+        }
+    }
+
+    static class Factory implements QueueEntryListFactory
+    {
+        private final String _conflationKey;
+
+        Factory(String conflationKey)
+        {
+            _conflationKey = conflationKey;
+        }
+
+        public QueueEntryList createQueueEntryList(AMQQueue queue)
+        {
+            return new ConflationQueueList(queue, _conflationKey);
+        }
+    }
+
+}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Sun Jan 31 00:31:49 2010
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -46,7 +45,7 @@
 
     public void registerQueue(AMQQueue queue)
     {
-        _queueMap.put(queue.getName(), queue);
+        _queueMap.put(queue.getNameShortString(), queue);
     }
 
     public void unregisterQueue(AMQShortString name)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Sun Jan 31 00:31:49 2010
@@ -63,7 +63,7 @@
      * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
      * by the message handle.
      */
-    private ArrayList<AMQQueue> _destinationQueues;
+    private ArrayList<? extends BaseQueue> _destinationQueues;
 
     private long _expiration;
 
@@ -131,7 +131,7 @@
     }
 
 
-    public ArrayList<AMQQueue> getDestinationQueues()
+    public ArrayList<? extends BaseQueue> getDestinationQueues()
     {
         return _destinationQueues;
     }
@@ -225,7 +225,7 @@
 
     }
 
-    public void enqueue(final ArrayList<AMQQueue> queues)
+    public void enqueue(final ArrayList<? extends BaseQueue> queues)
     {
         _destinationQueues = queues;
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Sun Jan 31 00:31:49 2010
@@ -28,6 +28,7 @@
 {
 
 
+
     public static enum State
     {
         AVAILABLE,
@@ -163,6 +164,8 @@
 
     boolean expired() throws AMQException;
 
+    boolean isAvailable();
+
     boolean isAcquired();
 
     boolean acquire();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sun Jan 31 00:31:49 2010
@@ -20,20 +20,23 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.log4j.Logger;
+
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.log4j.Logger;
 
-import java.util.*;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 
 public class QueueEntryImpl implements QueueEntry
@@ -154,6 +157,11 @@
 
     }
 
+    public boolean isAvailable()
+    {
+        return _state == AVAILABLE_STATE;
+    }
+
     public boolean isAcquired()
     {
         return _state.getState() == State.ACQUIRED;
@@ -408,7 +416,7 @@
 
             if(alternateExchange != null)
             {
-                final List<AMQQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
+                final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
                 final ServerMessage message = getMessage();
                 if(rerouteQueues != null && rerouteQueues.size() != 0)
                 {
@@ -419,9 +427,9 @@
                         {
                             try
                             {
-                                for(AMQQueue queue : rerouteQueues)
+                                for(BaseQueue queue : rerouteQueues)
                                 {
-                                    QueueEntry entry = queue.enqueue(message);
+                                    queue.enqueue(message);
                                 }
                             }
                             catch (AMQException e)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Sun Jan 31 00:31:49 2010
@@ -1,39 +1,51 @@
 package org.apache.qpid.server.queue;
 
-import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.management.JMException;
-
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.pool.ReadWriteRunnable;
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.QueueConfigType;
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.QueueActor;
+import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.logging.subjects.QueueLogSubject;
 import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionList;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.security.PrincipalHolder;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.QueueActor;
-import org.apache.qpid.server.logging.subjects.QueueLogSubject;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.messages.QueueMessages;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import javax.management.JMException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 /*
 *
@@ -81,7 +93,7 @@
     private Exchange _alternateExchange;
 
     /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
-    private final ExchangeBindings _bindings = new ExchangeBindings(this);
+
 
 
     protected final QueueEntryList _entries;
@@ -102,8 +114,15 @@
 
     private final AtomicLong _totalMessagesReceived = new AtomicLong();
 
+    private final AtomicLong _dequeueCount = new AtomicLong();
+    private final AtomicLong _dequeueSize = new AtomicLong();
+    private final AtomicLong _enqueueSize = new AtomicLong();
+    private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong();
+    private final AtomicLong _persistentMessageDequeueSize = new AtomicLong();
+    private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();;
+    private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
 
-
+    private final AtomicInteger _bindingCountHigh = new AtomicInteger();
 
     /** max allowed size(KB) of a single message */
     public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
@@ -151,18 +170,38 @@
 
     private final AtomicBoolean _overfull = new AtomicBoolean(false);
     private boolean _deleteOnNoConsumers;
+    private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
+    private UUID _id;
+    private final Map<String, Object> _arguments;
+
+    //TODO
+    private long _createTime = System.currentTimeMillis();
+
 
-    protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
+    protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost, Map<String,Object> arguments)
     {
-        this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory());
+        this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory(),arguments);
     }
 
+    public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, VirtualHost virtualHost, Map<String, Object> arguments)
+    {
+        this(queueName, durable, owner,autoDelete,virtualHost,new SimpleQueueEntryList.Factory(),arguments);
+    }
+
+    public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
+    {
+        this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner),autoDelete,virtualHost,entryListFactory, arguments);
+    }
+
+
+
     protected SimpleAMQQueue(AMQShortString name,
                              boolean durable,
                              AMQShortString owner,
                              boolean autoDelete,
                              VirtualHost virtualHost,
-                             QueueEntryListFactory entryListFactory)
+                             QueueEntryListFactory entryListFactory,
+                             Map<String,Object> arguments)
     {
 
         if (name == null)
@@ -182,6 +221,9 @@
         _autoDelete = autoDelete;
         _virtualHost = virtualHost;
         _entries = entryListFactory.createQueueEntryList(this);
+        _arguments = arguments;
+
+        _id = virtualHost.getConfigStore().createId();
 
         _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
 
@@ -208,6 +250,8 @@
                                                           durable, !durable,
                                                           priorities > 0));
 
+        getConfigStore().addConfiguredObject(this);
+
         try
         {
             _managedObject = new AMQQueueMBean(this);
@@ -222,12 +266,6 @@
 
     }
 
-    public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, VirtualHost virtualHost)
-            throws AMQException
-    {
-        this(new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner),autoDelete,virtualHost);
-    }
-
     public void resetNotifications()
     {
         // This ensure that the notification checks for the configured alerts are created.
@@ -244,7 +282,7 @@
         _asyncDelivery.execute(runnable);
     }
 
-    public AMQShortString getName()
+    public AMQShortString getNameShortString()
     {
         return _name;
     }
@@ -254,6 +292,21 @@
         _nolocal = nolocal;
     }
 
+    public UUID getId()
+    {
+        return _id;
+    }
+
+    public QueueConfigType getConfigType()
+    {
+        return QueueConfigType.getInstance();
+    }
+
+    public ConfiguredObject getParent()
+    {
+        return getVirtualHost();
+    }
+
     public boolean isDurable()
     {
         return _durable;
@@ -284,7 +337,7 @@
 
     public Map<String, Object> getArguments()
     {
-        return null;
+        return _arguments;
     }
 
     public boolean isAutoDelete()
@@ -313,56 +366,9 @@
         return _virtualHost;
     }
 
-    // ------ bind and unbind
-
-    public void bind(Exchange exchange, String bindingKey, Map<String, Object> arguments) throws AMQException
-    {
-
-        FieldTable fieldTable = FieldTable.convertToFieldTable(arguments);
-        AMQShortString routingKey = new AMQShortString(bindingKey);
-
-        exchange.registerQueue(routingKey, this, fieldTable);
-
-        if (isDurable() && exchange.isDurable())
-        {
-
-            _virtualHost.getDurableConfigurationStore().bindQueue(exchange, routingKey, this, fieldTable);
-        }
-
-        _bindings.addBinding(routingKey, fieldTable, exchange);
-    }
-
-
-    public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
-    {
-
-        exchange.registerQueue(routingKey, this, arguments);
-        if (isDurable() && exchange.isDurable())
-        {
-            _virtualHost.getDurableConfigurationStore().bindQueue(exchange, routingKey, this, arguments);
-        }
-
-        _bindings.addBinding(routingKey, arguments, exchange);
-    }
-
-    public void unBind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
+    public String getName()
     {
-        exchange.deregisterQueue(routingKey, this, arguments);
-        if (isDurable() && exchange.isDurable())
-        {
-            _virtualHost.getDurableConfigurationStore().unbindQueue(exchange, routingKey, this, arguments);
-        }
-
-        boolean removed = _bindings.remove(routingKey, arguments, exchange);
-        if (!removed)
-        {
-            _logger.error("Mismatch between queue bindings and exchange record of bindings");
-        }
-    }
-
-    public List<ExchangeBinding> getExchangeBindings()
-    {
-        return new ArrayList<ExchangeBinding>(_bindings.getExchangeBindings());
+        return getNameShortString().toString();
     }
 
     // ------ Manage Subscriptions
@@ -370,7 +376,7 @@
     public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
     {
 
-        if (isExclusiveSubscriber())
+        if (hasExclusiveSubscriber())
         {
             throw new ExistingExclusiveSubscription();
         }
@@ -459,17 +465,54 @@
         _deleteOnNoConsumers = b;
     }
 
+    public void addBinding(final Binding binding)
+    {
+        _bindings.add(binding);
+        int bindingCount = _bindings.size();
+        int bindingCountHigh;
+        while(bindingCount > (bindingCountHigh = _bindingCountHigh.get()))
+        {
+            if(_bindingCountHigh.compareAndSet(bindingCountHigh, bindingCount))
+            {
+                break;
+            }
+        }
+    }
+
+    public int getBindingCountHigh()
+    {
+        return _bindingCountHigh.get();
+    }
+
+    public void removeBinding(final Binding binding)
+    {
+        _bindings.remove(binding);
+    }
+
+    public List<Binding> getBindings()
+    {
+        return Collections.unmodifiableList(_bindings);
+    }
+
+    public int getBindingCount()
+    {
+        return getBindings().size();
+    }
 
     // ------ Enqueue / Dequeue
+    public void enqueue(ServerMessage message) throws AMQException
+    {
+        enqueue(message, null);
+    }
 
-    public QueueEntry enqueue(ServerMessage message) throws AMQException
+    public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
     {
 
         incrementQueueCount();
         incrementQueueSize(message);
-
         _totalMessagesReceived.incrementAndGet();
 
+
         QueueEntry entry;
         Subscription exclusiveSub = _exclusiveSubscriber;
 
@@ -554,7 +597,11 @@
             _managedObject.checkForNotification(entry.getMessage());
         }
 
-        return entry;
+        if(action != null)
+        {
+            action.onEnqueue(entry);
+        }
+
     }
 
     private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
@@ -596,7 +643,14 @@
 
     private void incrementQueueSize(final ServerMessage message)
     {
-        getAtomicQueueSize().addAndGet(message.getSize());
+        long size = message.getSize();
+        getAtomicQueueSize().addAndGet(size);
+        _enqueueSize.addAndGet(size);
+        if(message.isPersistent() && isDurable())
+        {
+            _persistentMessageEnqueueSize.addAndGet(size);
+            _persistentMessageEnqueueCount.incrementAndGet();
+        }
     }
 
     private void incrementQueueCount()
@@ -654,7 +708,7 @@
 
         SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
         // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
-        while (subscriberIter.advance())
+        while (subscriberIter.advance() && entry.isAvailable())
         {
             Subscription sub = subscriberIter.getNode().getSubscription();
 
@@ -702,12 +756,21 @@
 
     private void decrementQueueSize(final QueueEntry entry)
     {
-        getAtomicQueueSize().addAndGet(-entry.getMessage().getSize());
+        final ServerMessage message = entry.getMessage();
+        long size = message.getSize();
+        getAtomicQueueSize().addAndGet(-size);
+        _dequeueSize.addAndGet(size);
+        if(message.isPersistent() && isDurable())
+        {
+            _persistentMessageDequeueSize.addAndGet(size);
+            _persistentMessageDequeueCount.incrementAndGet();
+        }
     }
 
     void decrementQueueCount()
     {
         getAtomicQueueCount().decrementAndGet();
+        _dequeueCount.incrementAndGet();
     }
 
     public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
@@ -834,7 +897,7 @@
 
     public int compareTo(final AMQQueue o)
     {
-        return _name.compareTo(o.getName());
+        return _name.compareTo(o.getNameShortString());
     }
 
     public AtomicInteger getAtomicQueueCount()
@@ -847,7 +910,7 @@
         return _atomicQueueSize;
     }
 
-    private boolean isExclusiveSubscriber()
+    public boolean hasExclusiveSubscriber()
     {
         return _exclusiveSubscriber != null;
     }
@@ -1099,6 +1162,45 @@
 
     }
 
+    public void purge(final long request)
+    {
+        if(request == 0l)
+        {
+            clearQueue();
+        }
+        else if(request > 0l)
+        {
+
+            QueueEntryIterator queueListIterator = _entries.iterator();
+            long count = 0;
+
+            ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
+
+            while (queueListIterator.advance())
+            {
+                QueueEntry node = queueListIterator.getNode();
+                if (!node.isDeleted() && node.acquire())
+                {
+                    dequeueEntry(node, txn);
+                    if(++count == request)
+                    {
+                        break;
+                    }
+                }
+
+            }
+
+            txn.commit();
+
+
+        }
+    }
+
+    public long getCreateTime()
+    {
+        return _createTime;
+    }
+
     // ------ Management functions
 
     public void deleteMessageFromTop()
@@ -1172,11 +1274,21 @@
         _deleteTaskList.add(task);
     }
 
+    public void removeQueueDeleteTask(final Task task)
+    {
+        _deleteTaskList.remove(task);
+    }
+
     public int delete() throws AMQException
     {
         if (!_deleted.getAndSet(true))
         {
 
+            for(Binding b : getBindings())
+            {
+                _virtualHost.getBindingFactory().removeBinding(b);
+            }
+
             SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
 
             while (subscriptionIter.advance())
@@ -1188,8 +1300,8 @@
                 }
             }
 
-            _bindings.deregister();
             _virtualHost.getQueueRegistry().unregisterQueue(_name);
+            getConfigStore().removeConfiguredObject(this);
 
             List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
             {
@@ -1214,7 +1326,7 @@
                 for(final QueueEntry entry : entries)
                 {
                     adapter.setEntry(entry);
-                    final List<AMQQueue> rerouteQueues = _alternateExchange.route(adapter);
+                    final List<? extends BaseQueue> rerouteQueues = _alternateExchange.route(adapter);
                     final ServerMessage message = entry.getMessage();
                     if(rerouteQueues != null & rerouteQueues.size() != 0)
                     {
@@ -1226,9 +1338,9 @@
                                         {
                                             try
                                             {
-                                                for(AMQQueue queue : rerouteQueues)
+                                                for(BaseQueue queue : rerouteQueues)
                                                 {
-                                                    QueueEntry entry = queue.enqueue(message);
+                                                    queue.enqueue(message);
                                                 }
                                             }
                                             catch (AMQException e)
@@ -1479,7 +1591,7 @@
         // next entry they are interested in yet.  This would lead to holding on to references to expired messages, etc
         // which would give us memory "leak".
 
-        if (!isExclusiveSubscriber())
+        if (!hasExclusiveSubscriber())
         {
             advanceAllSubscriptions();
         }
@@ -1820,7 +1932,7 @@
     public void setFlowResumeCapacity(long flowResumeCapacity)
     {
         _flowResumeCapacity = flowResumeCapacity;
-        
+
         checkCapacity();
     }
 
@@ -1919,9 +2031,50 @@
     }
 
 
+    public ConfigStore getConfigStore()
+    {
+        return getVirtualHost().getConfigStore();
+    }
+
+    public long getMessageDequeueCount()
+    {
+        return  _dequeueCount.get();
+    }
+
+    public long getTotalEnqueueSize()
+    {
+        return _enqueueSize.get();
+    }
+
+    public long getTotalDequeueSize()
+    {
+        return _dequeueSize.get();
+    }
+
+    public long getPersistentByteEnqueues()
+    {
+        return _persistentMessageEnqueueSize.get();
+    }
+
+    public long getPersistentByteDequeues()
+    {
+        return _persistentMessageDequeueSize.get();
+    }
+
+    public long getPersistentMsgEnqueues()
+    {
+        return _persistentMessageEnqueueCount.get();
+    }
+
+    public long getPersistentMsgDequeues()
+    {
+        return _persistentMessageDequeueCount.get();
+    }
+
+
     @Override
     public String toString()
     {
-        return String.valueOf(getName());
+        return String.valueOf(getNameShortString());
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Sun Jan 31 00:31:49 2010
@@ -61,11 +61,9 @@
     {
         _deletes.incrementAndGet();
         QueueEntryImpl head = _head.nextNode();
-        boolean deleted = head.isDeleted();
         while(head._next != null && head.isDeleted())
         {
 
-            deleted = true;
             final QueueEntryImpl newhead = head.nextNode();
             if(newhead != null)
             {
@@ -77,11 +75,6 @@
             head = _head.nextNode();
         }
 
-        if(!deleted)
-        {
-            deleted = true;
-        }
-
         if(_deletes.get() > 1000L)
         {
             _deletes.set(0L);
@@ -135,7 +128,7 @@
 
     public QueueEntry add(ServerMessage message)
     {
-        QueueEntryImpl node = new QueueEntryImpl(this, message);
+        QueueEntryImpl node = createQueueEntry(message);
         for (;;)
         {
             QueueEntryImpl tail = _tail;
@@ -160,6 +153,11 @@
         }
     }
 
+    protected QueueEntryImpl createQueueEntry(ServerMessage message)
+    {
+        return new QueueEntryImpl(this, message);
+    }
+
     public QueueEntry next(QueueEntry node)
     {
         return ((QueueEntryImpl)node).getNext();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Sun Jan 31 00:31:49 2010
@@ -20,24 +20,33 @@
  */
 package org.apache.qpid.server.registry;
 
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
+
+import org.apache.qpid.qmf.QMFService;
+import org.apache.qpid.server.configuration.BrokerConfig;
+import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.configuration.SystemConfig;
+import org.apache.qpid.server.configuration.SystemConfigImpl;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.BrokerMessages;
 import org.apache.qpid.server.management.ManagedObjectRegistry;
 import org.apache.qpid.server.plugins.PluginManager;
 import org.apache.qpid.server.security.access.ACLManager;
 import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
 import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.transport.QpidAcceptor;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.transport.QpidAcceptor;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 
 /**
  * An abstract application registry that provides access to configuration information and handles the
@@ -75,6 +84,14 @@
 
     protected RootMessageLogger _rootMessageLogger;
 
+    protected UUID _brokerId = UUID.randomUUID();
+
+    protected QMFService _qmfService;
+
+    private BrokerConfig _broker;
+
+    private ConfigStore _configStore;
+
     static
     {
         Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
@@ -100,6 +117,16 @@
             _logger.info("Initialising Application Registry:" + instanceID);
             _instanceMap.put(instanceID, instance);
 
+            final ConfigStore store = ConfigStore.newInstance();
+            store.setRoot(new SystemConfigImpl(store));
+            instance.setConfigStore(store);
+
+            BrokerConfig broker = new BrokerConfigAdapter(instance, instanceID);
+
+            SystemConfig system = (SystemConfig) store.getRoot();
+            system.addBroker(broker);
+            instance.setBroker(broker);
+
             try
             {
                 instance.initialise(instanceID);
@@ -107,7 +134,14 @@
             catch (Exception e)
             {
                 _instanceMap.remove(instanceID);
-                throw e;
+                try
+                {
+                    system.removeBroker(broker);
+                }
+                finally
+                {
+                    throw e;
+                }
             }
         }
         else
@@ -116,6 +150,16 @@
         }
     }
 
+    public ConfigStore getConfigStore()
+    {
+        return _configStore;
+    }
+
+    public void setConfigStore(final ConfigStore configStore)
+    {
+        _configStore = configStore;
+    }
+
     public static boolean isConfigured()
     {
         return isConfigured(DEFAULT_INSTANCE);
@@ -151,6 +195,7 @@
                     _logger.info("Shuting down ApplicationRegistry(" + instanceID + "):" + instance);
                 }
                 instance.close();
+                instance.getBroker().getSystem().removeBroker(instance.getBroker());
             }
         }
         catch (Exception e)
@@ -316,5 +361,32 @@
     {
         return _rootMessageLogger;
     }
-    
+
+    public UUID getBrokerId()
+    {
+        return _brokerId;
+    }
+
+    public QMFService getQMFService()
+    {
+        return _qmfService;
+    }
+
+    public BrokerConfig getBroker()
+    {
+        return _broker;
+    }
+
+    public void setBroker(final BrokerConfig broker)
+    {
+        _broker = broker;
+    }
+
+    public VirtualHost createVirtualHost(final VirtualHostConfiguration vhostConfig) throws Exception
+    {
+        VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig);
+        _virtualHostRegistry.registerVirtualHost(virtualHost);
+        getBroker().addVirtualHost(virtualHost);
+        return virtualHost;
+    }
 }

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java?rev=904934&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java Sun Jan 31 00:31:49 2010
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.registry;
+
+import org.apache.qpid.server.configuration.*;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.common.QpidProperties;
+
+import java.util.UUID;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class BrokerConfigAdapter implements BrokerConfig
+{
+    private final IApplicationRegistry _instance;
+    private final int _instanceId;
+    private SystemConfig _system;
+
+    private final Map<UUID, VirtualHostConfig> _vhosts = new ConcurrentHashMap<UUID, VirtualHostConfig>();
+    private final long _createTime = System.currentTimeMillis();
+    private UUID _id;
+    private String _federationTag;
+
+    public BrokerConfigAdapter(final IApplicationRegistry instance, final int instanceID)
+    {
+        _instance = instance;
+        _instanceId = instanceID;
+        _id = instance.getConfigStore().createId();
+        _federationTag = UUID.randomUUID().toString();
+    }
+
+    public void setSystem(final SystemConfig system)
+    {
+        _system = system;
+    }
+
+    public SystemConfig getSystem()
+    {
+        return _system;
+    }
+
+    public Integer getPort()
+    {
+        List ports = _instance.getConfiguration().getPorts();
+        if(ports.size() > 0)
+        {
+            return Integer.valueOf(ports.get(0).toString());
+        }
+        else
+        {
+            return 0;
+        }
+    }
+
+    public Integer getWorkerThreads()
+    {
+        return _instance.getConfiguration().getProcessors();
+    }
+
+    public Integer getMaxConnections()
+    {
+        return 0;
+    }
+
+    public Integer getConnectionBacklogLimit()
+    {
+        return 0;
+    }
+
+    public Long getStagingThreshold()
+    {
+        return 0L;
+    }
+
+    public Integer getManagementPublishInterval()
+    {
+        return 10;
+    }
+
+    public String getVersion()
+    {
+        return QpidProperties.getReleaseVersion() + " [Build: " + QpidProperties.getBuildVersion() +  "]";
+    }
+
+    public String getDataDirectory()
+    {
+        return _instance.getConfiguration().getQpidWork();
+    }
+
+    public void addVirtualHost(final VirtualHostConfig virtualHost)
+    {
+        virtualHost.setBroker(this);
+        _vhosts.put(virtualHost.getId(), virtualHost);
+        getConfigStore().addConfiguredObject(virtualHost);
+
+    }
+
+    private ConfigStore getConfigStore()
+    {
+        return _instance.getConfigStore();
+    }
+
+    public long getCreateTime()
+    {
+        return _createTime;
+    }
+
+    public void createBrokerConnection(final String transport,
+                                       final String host,
+                                       final int port,
+                                       final boolean durable,
+                                       final String authMechanism,
+                                       final String username,
+                                       final String password)
+    {
+        VirtualHost vhost = _instance.getVirtualHostRegistry().getDefaultVirtualHost();
+        vhost.createBrokerConnection(transport, host, port, "", durable, authMechanism, username, password);
+    }
+
+    public UUID getId()
+    {
+        return _id;
+    }
+
+    public BrokerConfigType getConfigType()
+    {
+        return BrokerConfigType.getInstance();
+    }
+
+    public ConfiguredObject getParent()
+    {
+        return _system;
+    }
+
+    public boolean isDurable()
+    {
+        return false;
+    }
+
+    public String getFederationTag()
+    {
+        return _federationTag;
+    }
+}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Sun Jan 31 00:31:49 2010
@@ -21,13 +21,15 @@
 package org.apache.qpid.server.registry;
 
 import org.apache.commons.configuration.ConfigurationException;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.qmf.QMFService;
 import org.apache.qpid.server.configuration.ServerConfiguration;
 import org.apache.qpid.server.logging.RootMessageLoggerImpl;
-import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.BrokerMessages;
 import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
 import org.apache.qpid.server.management.JMXManagedObjectRegistry;
 import org.apache.qpid.server.management.NoopManagedObjectRegistry;
@@ -36,7 +38,6 @@
 import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager;
 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 import java.io.File;
 
@@ -51,6 +52,9 @@
 
     public void initialise(int instanceID) throws Exception
     {
+        _qmfService = new QMFService(getConfigStore(), this);
+
+
         _rootMessageLogger = new RootMessageLoggerImpl(_configuration,
                                                        new Log4jMessageLogger());
 
@@ -75,6 +79,7 @@
 
         _databaseManager.initialiseManagement(_configuration);
 
+
         _managedObjectRegistry.start();
 
         initialiseVirtualHosts();
@@ -91,6 +96,7 @@
         try
         {
             super.close();
+            _qmfService.close();
         }
         finally
         {
@@ -102,7 +108,7 @@
     {
         for (String name : _configuration.getVirtualHosts())
         {
-            _virtualHostRegistry.registerVirtualHost(new VirtualHostImpl(_configuration.getVirtualHostConfig(name)));
+            createVirtualHost(_configuration.getVirtualHostConfig(name));
         }
         getVirtualHostRegistry().setDefaultVirtualHostName(_configuration.getDefaultVirtualHost());
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=904934&r1=904933&r2=904934&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Sun Jan 31 00:31:49 2010
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,22 +20,25 @@
  */
 package org.apache.qpid.server.registry;
 
-import java.util.Collection;
-import java.net.InetSocketAddress;
-
-import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
+
+import org.apache.qpid.qmf.QMFService;
+import org.apache.qpid.server.configuration.BrokerConfig;
+import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.logging.RootMessageLogger;
 import org.apache.qpid.server.management.ManagedObjectRegistry;
 import org.apache.qpid.server.plugins.PluginManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
 import org.apache.qpid.server.security.access.ACLManager;
-import org.apache.qpid.server.security.access.ACLPlugin;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.transport.QpidAcceptor;
-import org.apache.mina.common.IoAcceptor;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+
+import java.net.InetSocketAddress;
+import java.util.UUID;
 
 public interface IApplicationRegistry
 {
@@ -81,4 +84,17 @@
      */
     void addAcceptor(InetSocketAddress bindAddress, QpidAcceptor acceptor);
 
+    public UUID getBrokerId();
+
+    QMFService getQMFService();
+
+    void setBroker(BrokerConfig broker);
+
+    BrokerConfig getBroker();
+
+    VirtualHost createVirtualHost(VirtualHostConfiguration vhostConfig) throws Exception;
+
+    ConfigStore getConfigStore();
+
+    void setConfigStore(ConfigStore store);
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org