You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC

svn commit: r686136 [8/17] - in /incubator/qpid/branches/qpid.0-10/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/bin/ broker/etc/ broker/...

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Thu Aug 14 20:40:49 2008
@@ -1,173 +1,184 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.log4j.Logger;
-
-import java.util.Set;
-import java.util.HashSet;
-import java.util.concurrent.atomic.AtomicReference;
-
+import org.apache.qpid.server.subscription.Subscription;
 
-public class QueueEntry
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+public interface QueueEntry extends Comparable<QueueEntry>
 {
 
-    /**
-     * Used for debugging purposes.
-     */
-    private static final Logger _log = Logger.getLogger(QueueEntry.class);
-
-    private final AMQQueue _queue;
-    private final AMQMessage _message;
 
-    private Set<Subscription> _rejectedBy = null;
 
-    private AtomicReference<Object> _owner = new AtomicReference<Object>();
-
-
-    public QueueEntry(AMQQueue queue, AMQMessage message)
+    public static enum State
     {
-        _queue = queue;
-        _message = message;
+        AVAILABLE,
+        ACQUIRED,
+        EXPIRED,
+        DEQUEUED,
+        DELETED
     }
 
-
-    public AMQQueue getQueue()
+    public static interface StateChangeListener
     {
-        return _queue;
+        public void stateChanged(QueueEntry entry, State oldSate, State newState);
     }
 
-    public AMQMessage getMessage()
+    public abstract class EntryState
     {
-        return _message;
-    }
+        private EntryState()
+        {
+        }
 
-    public long getSize()
-    {
-        return getMessage().getSize();
+        public abstract State getState();
     }
 
-    public boolean getDeliveredToConsumer()
-    {
-        return getMessage().getDeliveredToConsumer();
-    }
 
-    public boolean expired() throws AMQException
+    public final class AvailableState extends EntryState
     {
-        return getMessage().expired(_queue);
-    }
 
-    public boolean isTaken()
-    {
-        return _owner.get() != null;
+        public State getState()
+        {
+            return State.AVAILABLE;
+        }
     }
 
-    public boolean taken(Subscription sub)
-    {
-        return !(_owner.compareAndSet(null, sub == null ? this : sub));
-    }
 
-    public void setDeliveredToConsumer()
+    public final class DequeuedState extends EntryState
     {
-        getMessage().setDeliveredToConsumer();
-    }
 
-    public void release()
-    {
-        _owner.set(null);
+        public State getState()
+        {
+            return State.DEQUEUED;
+        }
     }
 
-    public String debugIdentity()
-    {
-        return getMessage().debugIdentity();
-    }
 
-    public void process(StoreContext storeContext, boolean deliverFirst) throws AMQException
+    public final class DeletedState extends EntryState
     {
-        _queue.process(storeContext, this, deliverFirst);
-    }
 
-    public void checkDeliveredToConsumer() throws NoConsumersException
-    {
-        _message.checkDeliveredToConsumer();
+        public State getState()
+        {
+            return State.DELETED;
+        }
     }
 
-    public void setRedelivered(boolean b)
+    public final class ExpiredState extends EntryState
     {
-        getMessage().setRedelivered(b);
-    }
 
-    public Subscription getDeliveredSubscription()
-    {
-        synchronized (this)
+        public State getState()
         {
-            Object owner = _owner.get();
-            if (owner instanceof Subscription)
-            {
-                return (Subscription) owner;
-            }
-            else
-            {
-                return null;
-            }
+            return State.EXPIRED;
         }
     }
 
-    public void reject()
+
+    public final class NonSubscriptionAcquiredState extends EntryState
     {
-        reject(getDeliveredSubscription());
+        public State getState()
+        {
+            return State.ACQUIRED;
+        }
     }
 
-    public void reject(Subscription subscription)
+    public final class SubscriptionAcquiredState extends EntryState
     {
-        if (subscription != null)
-        {
-            if (_rejectedBy == null)
-            {
-                _rejectedBy = new HashSet<Subscription>();
-            }
+        private final Subscription _subscription;
 
-            _rejectedBy.add(subscription);
-        }
-        else
+        public SubscriptionAcquiredState(Subscription subscription)
         {
-            _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
+            _subscription = subscription;
         }
-    }
 
-    public boolean isRejectedBy(Subscription subscription)
-    {
-        boolean rejected = _rejectedBy != null;
 
-        if (rejected) // We have subscriptions that rejected this message
+        public State getState()
         {
-            return _rejectedBy.contains(subscription);
+            return State.ACQUIRED;
         }
-        else // This messasge hasn't been rejected yet.
+
+        public Subscription getSubscription()
         {
-            return rejected;
+            return _subscription;
         }
     }
 
 
+    final static EntryState AVAILABLE_STATE = new AvailableState();
+    final static EntryState DELETED_STATE = new DeletedState();
+    final static EntryState DEQUEUED_STATE = new DequeuedState();
+    final static EntryState EXPIRED_STATE = new ExpiredState();
+    final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
+
+
+
+
+    AMQQueue getQueue();
+
+    AMQMessage getMessage();
+
+    long getSize();
+
+    boolean getDeliveredToConsumer();
+
+    boolean expired() throws AMQException;
+
+    boolean isAcquired();
+
+    boolean acquire();
+    boolean acquire(Subscription sub);
+
+    boolean delete();
+    boolean isDeleted();
+
+    boolean acquiredBySubscription();
+
+    void setDeliveredToSubscription();
+
+    void release();
+
+    String debugIdentity();
+
+    boolean immediateAndNotDelivered();
+
+    void setRedelivered(boolean b);
+
+    Subscription getDeliveredSubscription();
+
+    void reject();
+
+    void reject(Subscription subscription);
+
+    boolean isRejectedBy(Subscription subscription);
+
+    void requeue(StoreContext storeContext) throws AMQException;
+
+    void dequeue(final StoreContext storeContext) throws FailedDequeueException;
+
+    void dispose(final StoreContext storeContext) throws MessageCleanupException;
+
+    void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException;
+
+    boolean isQueueDeleted();
+
+    void addStateChangeListener(StateChangeListener listener);
+    boolean removeStateChangeListener(StateChangeListener listener);
 }

Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
            ('svn:eol-style' removed)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java Thu Aug 14 20:40:49 2008
@@ -1,27 +1,27 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-
-public interface QueueNotificationListener
-{
-    void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+
+public interface QueueNotificationListener
+{
+    void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
+}

Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
            ('svn:eol-style' removed)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Thu Aug 14 20:40:49 2008
@@ -26,7 +26,6 @@
 import java.util.List;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ContentChunk;
@@ -48,29 +47,35 @@
 
     private final MessageStore _messageStore;
 
+    private final Long _messageId;
     private long _arrivalTime;
 
-
-    public WeakReferenceMessageHandle(MessageStore messageStore)
+    public WeakReferenceMessageHandle(final Long messageId, MessageStore messageStore)
     {
+        _messageId = messageId;
         _messageStore = messageStore;
     }
 
-    public ContentHeaderBody getContentHeaderBody(StoreContext context, Long messageId) throws AMQException
+    public ContentHeaderBody getContentHeaderBody(StoreContext context) throws AMQException
     {
         ContentHeaderBody chb = (_contentHeaderBody != null ? _contentHeaderBody.get() : null);
         if (chb == null)
         {
-            MessageMetaData mmd = loadMessageMetaData(context, messageId);
+            MessageMetaData mmd = loadMessageMetaData(context);
             chb = mmd.getContentHeaderBody();
         }
         return chb;
     }
 
-    private MessageMetaData loadMessageMetaData(StoreContext context, Long messageId)
+    public Long getMessageId()
+    {
+        return _messageId;
+    }
+
+    private MessageMetaData loadMessageMetaData(StoreContext context)
             throws AMQException
     {
-        MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId);
+        MessageMetaData mmd = _messageStore.getMessageMetaData(context, _messageId);
         populateFromMessageMetaData(mmd);
         return mmd;
     }
@@ -82,11 +87,11 @@
         _messagePublishInfo = new WeakReference<MessagePublishInfo>(mmd.getMessagePublishInfo());
     }
 
-    public int getBodyCount(StoreContext context, Long messageId) throws AMQException
+    public int getBodyCount(StoreContext context) throws AMQException
     {
         if (_contentBodies == null)
         {
-            MessageMetaData mmd = _messageStore.getMessageMetaData(context, messageId);
+            MessageMetaData mmd = _messageStore.getMessageMetaData(context, _messageId);
             int chunkCount = mmd.getContentChunkCount();
             _contentBodies = new ArrayList<WeakReference<ContentChunk>>(chunkCount);
             for (int i = 0; i < chunkCount; i++)
@@ -97,12 +102,12 @@
         return _contentBodies.size();
     }
 
-    public long getBodySize(StoreContext context, Long messageId) throws AMQException
+    public long getBodySize(StoreContext context) throws AMQException
     {
-        return getContentHeaderBody(context, messageId).bodySize;
+        return getContentHeaderBody(context).bodySize;
     }
 
-    public ContentChunk getContentChunk(StoreContext context, Long messageId, int index) throws AMQException, IllegalArgumentException
+    public ContentChunk getContentChunk(StoreContext context, int index) throws AMQException, IllegalArgumentException
     {
         if (index > _contentBodies.size() - 1)
         {
@@ -113,7 +118,7 @@
         ContentChunk cb = wr.get();
         if (cb == null)
         {
-            cb = _messageStore.getContentBodyChunk(context, messageId, index);
+            cb = _messageStore.getContentBodyChunk(context, _messageId, index);
             _contentBodies.set(index, new WeakReference<ContentChunk>(cb));
         }
         return cb;
@@ -123,12 +128,11 @@
      * Content bodies are set <i>before</i> the publish and header frames
      *
      * @param storeContext
-     * @param messageId
      * @param contentChunk
      * @param isLastContentBody
      * @throws AMQException
      */
-    public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
+    public void addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
     {
         if (_contentBodies == null && isLastContentBody)
         {
@@ -142,16 +146,16 @@
             }
         }
         _contentBodies.add(new WeakReference<ContentChunk>(contentChunk));
-        _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1,
+        _messageStore.storeContentBodyChunk(storeContext, _messageId, _contentBodies.size() - 1,
                                             contentChunk, isLastContentBody);
     }
 
-    public MessagePublishInfo getMessagePublishInfo(StoreContext context, Long messageId) throws AMQException
+    public MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException
     {
         MessagePublishInfo bpb = (_messagePublishInfo != null ? _messagePublishInfo.get() : null);
         if (bpb == null)
         {
-            MessageMetaData mmd = loadMessageMetaData(context, messageId);
+            MessageMetaData mmd = loadMessageMetaData(context);
 
             bpb = mmd.getMessagePublishInfo();
         }
@@ -168,12 +172,9 @@
         _redelivered = redelivered;
     }
 
-    public boolean isPersistent(StoreContext context, Long messageId) throws AMQException
+    public boolean isPersistent()
     {
-        //todo remove literal values to a constant file such as AMQConstants in common
-        ContentHeaderBody chb = getContentHeaderBody(context, messageId);
-        return chb.properties instanceof BasicContentHeaderProperties &&
-               ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
+        return true;
     }
 
     /**
@@ -183,7 +184,7 @@
      * @param contentHeaderBody
      * @throws AMQException
      */
-    public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, MessagePublishInfo publishBody,
+    public void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo publishBody,
                                                ContentHeaderBody contentHeaderBody)
             throws AMQException
     {
@@ -199,24 +200,15 @@
 
         MessageMetaData mmd = new MessageMetaData(publishBody, contentHeaderBody, _contentBodies.size(), arrivalTime);
 
-        _messageStore.storeMessageMetaData(storeContext, messageId, mmd);
+        _messageStore.storeMessageMetaData(storeContext, _messageId, mmd);
 
-        populateFromMessageMetaData(mmd);
-    }
 
-    public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
-    {
-        _messageStore.removeMessage(storeContext, messageId);
-    }
-
-    public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
-    {
-        _messageStore.enqueueMessage(storeContext, queue.getName(), messageId);
+        populateFromMessageMetaData(mmd);
     }
 
-    public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
+    public void removeMessage(StoreContext storeContext) throws AMQException
     {
-        _messageStore.dequeueMessage(storeContext, queue.getName(), messageId);
+        _messageStore.removeMessage(storeContext, _messageId);
     }
 
     public long getArrivalTime()

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Thu Aug 14 20:40:49 2008
@@ -24,9 +24,17 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.configuration.Configurator;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.mina.common.IoAcceptor;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.net.InetSocketAddress;
 
 /**
  * An abstract application registry that provides access to configuration information and handles the
@@ -36,7 +44,7 @@
  */
 public abstract class ApplicationRegistry implements IApplicationRegistry
 {
-    private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
+    protected static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
 
     private static Map<Integer, IApplicationRegistry> _instanceMap = new HashMap<Integer, IApplicationRegistry>();
 
@@ -48,6 +56,20 @@
     public static final String DEFAULT_APPLICATION_REGISTRY = "org.apache.qpid.server.util.NullApplicationRegistry";
     public static String _APPLICATION_REGISTRY = DEFAULT_APPLICATION_REGISTRY;
 
+    protected final Map<InetSocketAddress, IoAcceptor> _acceptors = new HashMap<InetSocketAddress, IoAcceptor>();
+
+    protected ManagedObjectRegistry _managedObjectRegistry;
+
+    protected AuthenticationManager _authenticationManager;
+
+    protected VirtualHostRegistry _virtualHostRegistry;
+
+    protected ACLPlugin _accessManager;
+
+    protected PrincipalDatabaseManager _databaseManager;
+
+    protected PluginManager _pluginManager;
+
     static
     {
         Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
@@ -57,7 +79,6 @@
     {
         public void run()
         {
-            _logger.info("Shutting down application registries...");
             removeAll();
         }
     }
@@ -90,16 +111,29 @@
         }
     }
 
+    /**
+     * Method to cleanly shutdown specified registry running in this JVM
+     *
+     * @param instanceID the instance to shutdown
+     */
+
     public static void remove(int instanceID)
     {
         try
         {
-            _instanceMap.get(instanceID).close();
+            IApplicationRegistry instance = _instanceMap.get(instanceID);
+            if (instance != null)
+            {
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Shuting down ApplicationRegistry(" + instanceID + "):" + instance);
+                }
+                instance.close();
+            }
         }
         catch (Exception e)
         {
-            _logger.error("Error shutting down message store: " + e, e);
-
+            _logger.error("Error shutting down Application Registry(" + instanceID + "): " + e, e);
         }
         finally
         {
@@ -107,6 +141,7 @@
         }
     }
 
+    /** Method to cleanly shutdown all registries currently running in this JVM */
     public static void removeAll()
     {
         Object[] keys = _instanceMap.keySet().toArray();
@@ -158,15 +193,36 @@
 
     public void close() throws Exception
     {
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("Shutting down ApplicationRegistry:"+this);
+        }
+
+        //Stop incomming connections
+        unbind();
+
+        //Shutdown virtualhosts
         for (VirtualHost virtualHost : getVirtualHostRegistry().getVirtualHosts())
         {
             virtualHost.close();
         }
 
         // close the rmi registry(if any) started for management
-        if (getInstance().getManagedObjectRegistry() != null)
+        if (getManagedObjectRegistry() != null)
+        {
+            getManagedObjectRegistry().close();
+        }
+    }
+
+    private void unbind()
+    {
+        synchronized (_acceptors)
         {
-            getInstance().getManagedObjectRegistry().close();
+            for (InetSocketAddress bindAddress : _acceptors.keySet())
+            {
+                IoAcceptor acceptor = _acceptors.get(bindAddress);
+                acceptor.unbind(bindAddress);
+            }
         }
     }
 
@@ -175,6 +231,14 @@
         return _configuration;
     }
 
+    public void addAcceptor(InetSocketAddress bindAddress, IoAcceptor acceptor)
+    {
+        synchronized (_acceptors)
+        {
+            _acceptors.put(bindAddress, acceptor);
+        }
+    }
+
     public <T> T getConfiguredObject(Class<T> instanceType)
     {
         T instance = (T) _configuredObjects.get(instanceType);
@@ -195,9 +259,39 @@
         return instance;
     }
 
-
     public static void setDefaultApplicationRegistry(String clazz)
     {
         _APPLICATION_REGISTRY = clazz;
     }
+
+    public VirtualHostRegistry getVirtualHostRegistry()
+    {
+        return _virtualHostRegistry;
+    }
+
+    public ACLPlugin getAccessManager()
+    {
+        return _accessManager;
+    }
+
+    public ManagedObjectRegistry getManagedObjectRegistry()
+    {
+        return _managedObjectRegistry;
+    }
+
+    public PrincipalDatabaseManager getDatabaseManager()
+    {
+        return _databaseManager;
+    }
+
+    public AuthenticationManager getAuthenticationManager()
+    {
+        return _authenticationManager;
+    }
+
+    public PluginManager getPluginManager()
+    {
+        return _pluginManager;
+    }
+
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Thu Aug 14 20:40:49 2008
@@ -48,23 +48,6 @@
 public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
 {
 
-    private ManagedObjectRegistry _managedObjectRegistry;
-
-    private AuthenticationManager _authenticationManager;
-
-    private ACLPlugin _accessManager;
-
-    private PrincipalDatabaseManager _databaseManager;
-
-    private VirtualHostRegistry _virtualHostRegistry;
-
-    private PluginManager _pluginManager;
-
-
-    public ConfigurationFileApplicationRegistry(Configuration configuration)
-    {
-        super(configuration);
-    }
 
     public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
     {
@@ -81,7 +64,7 @@
         }
     }
 
-    public static final Configuration config(File url) throws ConfigurationException
+    private static final Configuration config(File url) throws ConfigurationException
     {
         // We have to override the interpolate methods so that
         // interpolation takes place accross the entirety of the
@@ -150,39 +133,9 @@
         }
     }
 
-
-    public VirtualHostRegistry getVirtualHostRegistry()
-    {
-        return _virtualHostRegistry;
-    }
-
-    public ACLPlugin getAccessManager()
-    {
-        return _accessManager;
-    }
-
-    public ManagedObjectRegistry getManagedObjectRegistry()
-    {
-        return _managedObjectRegistry;
-    }
-
-    public PrincipalDatabaseManager getDatabaseManager()
-    {
-        return _databaseManager;
-    }
-
-    public AuthenticationManager getAuthenticationManager()
-    {
-        return _authenticationManager;
-    }
-
     public Collection<String> getVirtualHostNames()
     {
         return getConfiguration().getList("virtualhosts.virtualhost.name");
     }
 
-    public PluginManager getPluginManager()
-    {
-        return _pluginManager;
-    }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Thu Aug 14 20:40:49 2008
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.registry;
 
 import java.util.Collection;
+import java.net.InetSocketAddress;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.qpid.server.management.ManagedObjectRegistry;
@@ -29,6 +30,7 @@
 import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
 import org.apache.qpid.server.security.access.ACLPlugin;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.mina.common.IoAcceptor;
 
 public interface IApplicationRegistry
 {
@@ -39,6 +41,10 @@
      */
     void initialise() throws Exception;
 
+    /**
+     * Shutdown this Registry
+     * @throws Exception - //fixme needs to be made more specific
+     */
     void close() throws Exception;
 
     /**
@@ -71,5 +77,12 @@
     ACLPlugin getAccessManager();
 
     PluginManager getPluginManager();
-    
+
+    /**
+     * Register any acceptors for this registry
+     * @param bindAddress The address that the acceptor has been bound with
+     * @param acceptor The acceptor in use
+     */
+    void addAcceptor(InetSocketAddress bindAddress, IoAcceptor acceptor);
+
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java Thu Aug 14 20:40:49 2008
@@ -27,23 +27,23 @@
         GRANTED, REFUSED
     }
 
-    StringBuilder _authorizer;
-    AccessStatus _status;
+    private String _authorizer;
+    private AccessStatus _status;
 
     public AccessResult(ACLPlugin authorizer, AccessStatus status)
     {
         _status = status;
-        _authorizer = new StringBuilder(authorizer.getPluginName());
+        _authorizer = authorizer.getPluginName();
     }
 
     public void setAuthorizer(ACLPlugin authorizer)
     {
-        _authorizer.append(authorizer.getPluginName());
+        _authorizer += authorizer.getPluginName();
     }
 
     public String getAuthorizer()
     {
-        return _authorizer.toString();
+        return _authorizer;
     }
 
     public void setStatus(AccessStatus status)
@@ -58,8 +58,7 @@
 
     public void addAuthorizer(ACLPlugin accessManager)
     {
-        _authorizer.insert(0, "->");
-        _authorizer.insert(0, accessManager.getPluginName());
+        _authorizer = accessManager.getPluginName() + "->" + _authorizer;
     }
 
 

Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
            ('svn:eol-style' removed)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java Thu Aug 14 20:40:49 2008
@@ -28,13 +28,9 @@
 import org.apache.qpid.server.security.access.Accessable;
 import org.apache.qpid.server.security.access.Permission;
 import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
 
 public class AllowAll implements ACLPlugin
 {
-
-    private static final Logger _logger = ACLManager.getLogger();
-
     public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters)
     {
         if (ACLManager.getLogger().isDebugEnabled())

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java Thu Aug 14 20:40:49 2008
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.security.auth;
 
+import javax.security.sasl.SaslException;
+
 public class AuthenticationResult
 {
     public enum AuthenticationStatus
@@ -29,15 +31,33 @@
 
     public AuthenticationStatus status;
     public byte[] challenge;
+    
+    private Exception cause;
+
+    public AuthenticationResult(AuthenticationStatus status)
+    {
+        this(null, status, null);
+    }
 
     public AuthenticationResult(byte[] challenge, AuthenticationStatus status)
     {
+        this(challenge, status, null);
+    }
+
+    public AuthenticationResult(AuthenticationStatus error, Exception cause)
+    {
+        this(null, error, cause);
+    }
+
+    public AuthenticationResult(byte[] challenge, AuthenticationStatus status, Exception cause)
+    {
         this.status = status;
         this.challenge = challenge;
+        this.cause = cause;
     }
 
-    public AuthenticationResult(AuthenticationStatus status)
+    public Exception getCause()
     {
-        this.status = status;
+        return cause;
     }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Thu Aug 14 20:40:49 2008
@@ -230,12 +230,7 @@
         }
         catch (SaslException e)
         {
-            return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR);
+            return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
         }
     }
-
-    public AuthenticationResult isAuthorize(VirtualHost vhost, String username)
-    {
-        return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR);
-    }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/amqplain/AmqPlainSaslServer.java Thu Aug 14 20:40:49 2008
@@ -68,12 +68,15 @@
             PasswordCallback passwordCb = new PasswordCallback("prompt", false);
             // TODO: should not get pwd as a String but as a char array...
             String pwd = (String) ft.getString("PASSWORD");
-            passwordCb.setPassword(pwd.toCharArray());
             AuthorizeCallback authzCb = new AuthorizeCallback(username, username);
             Callback[] callbacks = new Callback[]{nameCb, passwordCb, authzCb};
             _cbh.handle(callbacks);
-            _complete = true;
-            if (authzCb.isAuthorized())
+            String storedPwd = new String(passwordCb.getPassword());
+            if (storedPwd.equals(pwd))
+            {
+                _complete = true;
+            }
+            if (authzCb.isAuthorized() && _complete)
             {
                 _authorizationId = authzCb.getAuthenticationID();
                 return null;

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServer.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServer.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/plain/PlainSaslServer.java Thu Aug 14 20:40:49 2008
@@ -72,17 +72,19 @@
 
             // we do not care about the prompt but it throws if null
             NameCallback nameCb = new NameCallback("prompt", authzid);
-            // we do not care about the prompt but it throws if null
             PasswordCallback passwordCb = new PasswordCallback("prompt", false);
             // TODO: should not get pwd as a String but as a char array...
             int passwordLen = response.length - authcidNullPosition - 1;
             String pwd = new String(response, authcidNullPosition + 1, passwordLen, "utf8");
-            passwordCb.setPassword(pwd.toCharArray());
             AuthorizeCallback authzCb = new AuthorizeCallback(authzid, authzid);
             Callback[] callbacks = new Callback[]{nameCb, passwordCb, authzCb};
             _cbh.handle(callbacks);
-            _complete = true;
-            if (authzCb.isAuthorized())
+            String storedPwd = new String(passwordCb.getPassword());
+            if (storedPwd.equals(pwd))
+            {
+                _complete = true;
+            }
+            if (authzCb.isAuthorized() && _complete)
             {
                 _authorizationId = authzCb.getAuthenticationID();
                 return null;

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Thu Aug 14 20:40:49 2008
@@ -26,9 +26,9 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.ArrayList;
@@ -121,22 +121,28 @@
 
     }
 
+
     public void createQueue(AMQQueue queue) throws AMQException
     {
+        // Not requred to do anything
+    }
+
+    public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
+    {
         // Not required to do anything
     }
 
-    public void removeQueue(AMQShortString name) throws AMQException
+    public void removeQueue(final AMQQueue queue) throws AMQException
     {
         // Not required to do anything
     }
 
-    public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+    public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
     {
         // Not required to do anything
     }
 
-    public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+    public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
     {
         // Not required to do anything
     }
@@ -213,7 +219,12 @@
         return bodyList.get(index);
     }
 
-     private void checkNotClosed() throws MessageStoreClosedException
+    public boolean isPersistent()
+    {
+        return false;
+    }
+
+    private void checkNotClosed() throws MessageStoreClosedException
      {
         if (_closed.get())
         {

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Thu Aug 14 20:40:49 2008
@@ -27,8 +27,8 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 /**
@@ -136,35 +136,42 @@
     void createQueue(AMQQueue queue) throws AMQException;
 
     /**
-     * Removes the specified queue from the persistent store.
+     * Makes the specified queue persistent.
      *
-     * @param name The queue to remove.
+     * @param queue The queue to store.
      *
+     * @param arguments The additional arguments to the binding
      * @throws AMQException If the operation fails for any reason.
      */
-    void removeQueue(AMQShortString name) throws AMQException;
+    void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException;
+
+    /**
+     * Removes the specified queue from the persistent store.
+     *
+     * @param queue The queue to remove.
+     * @throws AMQException If the operation fails for any reason.
+     */
+    void removeQueue(final AMQQueue queue) throws AMQException;
 
     /**
      * Places a message onto a specified queue, in a given transactional context.
      *
      * @param context   The transactional context for the operation.
-     * @param name      The name of the queue to place the message on.
+     * @param queue     The queue to place the message on.
      * @param messageId The message to enqueue.
-     *
      * @throws AMQException If the operation fails for any reason.
      */
-    void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException;
+    void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException;
 
     /**
      * Extracts a message from a specified queue, in a given transactional context.
      *
      * @param context   The transactional context for the operation.
-     * @param name      The name of the queue to take the message from.
+     * @param queue     The queue to place the message on.
      * @param messageId The message to dequeue.
-     *
      * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
      */
-    void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException;
+    void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException;
 
     /**
      * Begins a transactional context.
@@ -258,4 +265,12 @@
      * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
      */
     ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
+
+    /**
+     * Is this store capable of persisting the data
+     * 
+     * @return true if this store is capable of persisting data
+     */
+    boolean isPersistent();
+
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Thu Aug 14 20:40:49 2008
@@ -37,7 +37,7 @@
 
     public StoreContext()
     {
-        _name = super.toString();
+        _name = "StoreContext";
     }
 
     public StoreContext(String name)
@@ -52,7 +52,10 @@
 
     public void setPayload(Object payload)
     {
-        _logger.debug("public void setPayload(Object payload = " + payload + "): called");
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("public void setPayload(Object payload = " + payload + "): called");
+        }
         _payload = payload;
     }
 

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java Thu Aug 14 20:40:49 2008
@@ -97,6 +97,10 @@
                 defaultValue = "false")
     public boolean _multiThreadNIO;
 
+    @Configured(path = "advanced.useWriteBiasedPool",
+                    defaultValue = "false")        
+    public boolean useBiasedWrites;
+
 
     public IoAcceptor createAcceptor()
     {

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Thu Aug 14 20:40:49 2008
@@ -24,18 +24,16 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.ack.TxAck;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.NoConsumersException;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.*;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 
-import java.util.LinkedList;
 import java.util.List;
+import java.util.ArrayList;
 
 /** A transactional context that only supports local transactions. */
 public class LocalTransactionalContext implements TransactionalContext
@@ -44,7 +42,7 @@
 
     private final TxnBuffer _txnBuffer = new TxnBuffer();
 
-    private final List<DeliveryDetails> _postCommitDeliveryList = new LinkedList<DeliveryDetails>();
+    private final List<DeliveryAction> _postCommitDeliveryList = new ArrayList<DeliveryAction>();
 
     /**
      * We keep hold of the ack operation so that we can consolidate acks, i.e. multiple acks within a txn are
@@ -52,81 +50,120 @@
      */
     private TxAck _ackOp;
 
-    private List<RequiredDeliveryException> _returnMessages;
-
-    private final MessageStore _messageStore;
-
-    private final StoreContext _storeContext;
-
     private boolean _inTran = false;
 
     /** Are there messages to deliver. NOT Has the message been delivered */
     private boolean _messageDelivered = false;
+    private final AMQChannel _channel;
+
 
-    private static class DeliveryDetails
+    private abstract class DeliveryAction
     {
-        public QueueEntry entry;
 
-        private boolean deliverFirst;
+        abstract public void process() throws AMQException;
+
+    }
+
+    private class RequeueAction extends DeliveryAction
+    {
+        public QueueEntry entry;
 
-        public DeliveryDetails(QueueEntry entry, boolean deliverFirst)
+        public RequeueAction(QueueEntry entry)
         {
             this.entry = entry;
-            this.deliverFirst = deliverFirst;
+        }
+
+        public void process() throws AMQException
+        {
+            entry.requeue(getStoreContext());
         }
     }
 
-    public LocalTransactionalContext(MessageStore messageStore, StoreContext storeContext,
-        List<RequiredDeliveryException> returnMessages)
+    private class PublishAction extends DeliveryAction
     {
-        _messageStore = messageStore;
-        _storeContext = storeContext;
-        _returnMessages = returnMessages;
-        // _txnBuffer.enlist(new StoreMessageOperation(messageStore));
+        private final AMQQueue _queue;
+        private final AMQMessage _message;
+
+        public PublishAction(final AMQQueue queue, final AMQMessage message)
+        {
+            _queue = queue;
+            _message = message;
+        }
+
+        public void process() throws AMQException
+        {
+
+            _message.incrementReference();
+            try
+            {
+                QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
+
+                if(entry.immediateAndNotDelivered())
+                {
+                    getReturnMessages().add(new NoConsumersException(_message));
+                }
+            }
+            finally
+            {
+                _message.decrementReference(getStoreContext());
+            }
+        }
+    }
+
+    public LocalTransactionalContext(final AMQChannel channel)
+    {
+        _channel = channel;
     }
 
     public StoreContext getStoreContext()
     {
-        return _storeContext;
+        return _channel.getStoreContext();
     }
 
+    public List<RequiredDeliveryException> getReturnMessages()
+    {
+        return _channel.getReturnMessages();
+    }
+
+    public MessageStore getMessageStore()
+    {
+        return _channel.getMessageStore();
+    }
+
+
     public void rollback() throws AMQException
     {
-        _txnBuffer.rollback(_storeContext);
+        _txnBuffer.rollback(getStoreContext());
         // Hack to deal with uncommitted non-transactional writes
-        if (_messageStore.inTran(_storeContext))
+        if (getMessageStore().inTran(getStoreContext()))
         {
-            _messageStore.abortTran(_storeContext);
+            getMessageStore().abortTran(getStoreContext());
             _inTran = false;
         }
 
         _postCommitDeliveryList.clear();
     }
 
-    public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException
+    public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
     {
         // A publication will result in the enlisting of several
         // TxnOps. The first is an op that will store the message.
         // Following that (and ordering is important), an op will
         // be added for every queue onto which the message is
-        // enqueued. Finally a cleanup op will be added to decrement
-        // the reference associated with the routing.
-        // message.incrementReference();
-        _postCommitDeliveryList.add(new DeliveryDetails(entry, deliverFirst));
+        // enqueued.
+        _postCommitDeliveryList.add(new PublishAction(queue, message));
         _messageDelivered = true;
-        _txnBuffer.enlist(new CleanupMessageOperation(entry.getMessage(), _returnMessages));
-        /*_txnBuffer.enlist(new DeliverMessageOperation(message, queue));
-        if (_log.isDebugEnabled())
-        {
-            _log.debug("Incrementing ref count on message and enlisting cleanup operation - id " +
-                       message.getMessageId());
-        }
-        message.incrementReference();
+
+    }
+
+    public void requeue(QueueEntry entry) throws AMQException
+    {
+        _postCommitDeliveryList.add(new RequeueAction(entry));
         _messageDelivered = true;
 
-         */
     }
 
+
     private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
     {
         if (!unacknowledgedMessageMap.contains(deliveryTag))
@@ -147,10 +184,8 @@
         // as new acks come in. If this is the first ack in the txn
         // we will need to create and enlist the op.
         if (_ackOp == null)
-        {
-
+        {            
             _ackOp = new TxAck(unacknowledgedMessageMap);
-
             _txnBuffer.enlist(_ackOp);
         }
         // update the op to include this ack request
@@ -189,7 +224,7 @@
                 _log.debug("Starting transaction on message store: " + this);
             }
 
-            _messageStore.beginTran(_storeContext);
+            getMessageStore().beginTran(getStoreContext());
             _inTran = true;
         }
     }
@@ -212,22 +247,22 @@
 
         if (_messageDelivered && _inTran)
         {
-            _txnBuffer.enlist(new StoreMessageOperation(_messageStore));
+            _txnBuffer.enlist(new StoreMessageOperation(getMessageStore()));
         }
         // fixme fail commit here ... QPID-440
         try
         {
-            _txnBuffer.commit(_storeContext);
+            _txnBuffer.commit(getStoreContext());
         }
         finally
         {
             _messageDelivered = false;
-            _inTran = _messageStore.inTran(_storeContext);
+            _inTran = getMessageStore().inTran(getStoreContext());
         }
 
         try
         {
-            postCommitDelivery(_returnMessages);
+            postCommitDelivery();
         }
         catch (AMQException e)
         {
@@ -236,7 +271,7 @@
         }
     }
 
-    private void postCommitDelivery(List<RequiredDeliveryException> returnMessages) throws AMQException
+    private void postCommitDelivery() throws AMQException
     {
         if (_log.isDebugEnabled())
         {
@@ -245,18 +280,9 @@
 
         try
         {
-            for (DeliveryDetails dd : _postCommitDeliveryList)
+            for (DeliveryAction dd : _postCommitDeliveryList)
             {
-                dd.entry.process(_storeContext, dd.deliverFirst);
-
-                try
-                {
-                    dd.entry.checkDeliveredToConsumer();
-                }
-                catch (NoConsumersException nce)
-                {
-                    returnMessages.add(nce);
-                }
+                dd.process();
             }
         }
         finally

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Thu Aug 14 20:40:49 2008
@@ -22,19 +22,14 @@
 
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.ack.UnacknowledgedMessage;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.NoConsumersException;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.*;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 
@@ -49,6 +44,8 @@
     /** Where to put undeliverable messages */
     private final List<RequiredDeliveryException> _returnMessages;
 
+
+
     private final MessageStore _messageStore;
 
     private final StoreContext _storeContext;
@@ -57,12 +54,6 @@
     private boolean _inTran;
 
     public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel,
-                                   List<RequiredDeliveryException> returnMessages, Set<Long> browsedAcks)
-    {
-        this(messageStore,storeContext,channel,returnMessages);
-    }
-
-    public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel,
                                    List<RequiredDeliveryException> returnMessages)
     {
         _channel = channel;
@@ -97,19 +88,22 @@
         // Does not apply to this context
     }
 
-    public void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException
+    public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
     {
-        try
+        QueueEntry entry = queue.enqueue(_storeContext, message);
+        
+        //following check implements the functionality
+        //required by the 'immediate' flag:
+        if(entry.immediateAndNotDelivered())
         {
-            entry.process(_storeContext, deliverFirst);
-            //following check implements the functionality
-            //required by the 'immediate' flag:
-            entry.checkDeliveredToConsumer();
-        }
-        catch (NoConsumersException e)
-        {
-            _returnMessages.add(e);
+            _returnMessages.add(new NoConsumersException(entry.getMessage()));
         }
+
+    }
+
+    public void requeue(QueueEntry entry) throws AMQException
+    {
+        entry.requeue(_storeContext);
     }
 
     public void acknowledgeMessage(final long deliveryTag, long lastDeliveryTag,
@@ -118,7 +112,7 @@
     {
 
         final boolean debug = _log.isDebugEnabled();
-
+        ;
         if (multiple)
         {
             if (deliveryTag == 0)
@@ -130,7 +124,7 @@
                           unacknowledgedMessageMap.size());
                 unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
                 {
-                    public boolean callback(UnacknowledgedMessage message) throws AMQException
+                    public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
                     {
                         if (debug)
                         {
@@ -159,28 +153,13 @@
                     throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
                 }
 
-                LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
-                unacknowledgedMessageMap.drainTo(acked, deliveryTag);
-                for (UnacknowledgedMessage msg : acked)
-                {
-                    if (debug)
-                    {
-                        _log.debug("Discarding message: " + msg.getMessage().getMessageId());
-                    }
-                    if(msg.getMessage().isPersistent())
-                    {
-                        beginTranIfNecessary();
-                    }
-
-                    //Message has been ack so discard it. This will dequeue and decrement the reference.
-                    msg.discard(_storeContext);
-                }
+                unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext);
             }
         }
         else
         {
-            UnacknowledgedMessage msg;
-            msg = unacknowledgedMessageMap.remove(deliveryTag);
+            QueueEntry msg;
+            msg = unacknowledgedMessageMap.get(deliveryTag);
 
             if (msg == null)
             {
@@ -202,19 +181,20 @@
             //Message has been ack so discard it. This will dequeue and decrement the reference.
             msg.discard(_storeContext);
 
+            unacknowledgedMessageMap.remove(deliveryTag);
+
+
             if (debug)
             {
                 _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
                            msg.getMessage().getMessageId());
             }
         }
-
         if(_inTran)
         {
             _messageStore.commitTran(_storeContext);
             _inTran = false;
         }
-
     }
 
     public void messageFullyReceived(boolean persistent) throws AMQException
@@ -228,6 +208,6 @@
 
     public void messageProcessed(AMQProtocolSession protocolSession) throws AMQException
     {
-        _channel.processReturns(protocolSession);
+        _channel.processReturns();
     }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java Thu Aug 14 20:40:49 2008
@@ -25,6 +25,7 @@
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.StoreContext;
 
 /**
@@ -106,18 +107,26 @@
     void rollback() throws AMQException;
 
     /**
-     * Delivers the specified message to the specified queue. A 'deliverFirst' flag may be set if the message is a
-     * redelivery, and should be placed on the front of the queue.
+     * Delivers the specified message to the specified queue.
      *
      * <p/>This is an 'enqueue' operation.
      *
-     * @param entry        The message to deliver, and the queue to deliver to.
-     * @param deliverFirst <tt>true</tt> to place the message on the front of the queue for redelivery, <tt>false</tt>
-     *                     for normal FIFO message ordering.
-     *
+     * @param queue
+     * @param message      The message to deliver
      * @throws AMQException If the message cannot be delivered for any reason.
      */
-    void deliver(QueueEntry entry, boolean deliverFirst) throws AMQException;
+    void deliver(final AMQQueue queue, AMQMessage message) throws AMQException;
+
+    /**
+         * Requeues the specified message entry (message queue pair)
+         *
+         *
+         * @param queueEntry      The message,queue pair
+         *
+         * @throws AMQException If the message cannot be delivered for any reason.
+         */
+    void requeue(QueueEntry queueEntry) throws AMQException;
+
 
     /**
      * Acknowledges a message or many messages as delivered. All messages up to a specified one, may be acknowledged by

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java Thu Aug 14 20:40:49 2008
@@ -42,19 +42,6 @@
 
 public class NullApplicationRegistry extends ApplicationRegistry
 {
-    private ManagedObjectRegistry _managedObjectRegistry;
-
-    private AuthenticationManager _authenticationManager;
-
-    private VirtualHostRegistry _virtualHostRegistry;
-
-    private ACLPlugin _accessManager;
-
-    private PrincipalDatabaseManager _databaseManager;
-
-    private PluginManager _pluginManager;
-
-
     public NullApplicationRegistry()
     {
         super(new MapConfiguration(new HashMap()));
@@ -62,6 +49,8 @@
 
     public void initialise() throws Exception
     {
+        _logger.info("Initialising NullApplicationRegistry");
+        
         _configuration.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore");
 
         Properties users = new Properties();
@@ -84,47 +73,11 @@
 
     }
 
-    public Configuration getConfiguration()
-    {
-        return _configuration;
-    }
-
-
-    public ManagedObjectRegistry getManagedObjectRegistry()
-    {
-        return _managedObjectRegistry;
-    }
-
-    public PrincipalDatabaseManager getDatabaseManager()
-    {
-        return _databaseManager;
-    }
-
-    public AuthenticationManager getAuthenticationManager()
-    {
-        return _authenticationManager;
-    }
-
     public Collection<String> getVirtualHostNames()
     {
         String[] hosts = {"test"};
         return Arrays.asList(hosts);
     }
-
-    public VirtualHostRegistry getVirtualHostRegistry()
-    {
-        return _virtualHostRegistry;
-    }
-
-    public ACLPlugin getAccessManager()
-    {
-        return _accessManager;
-    }
-
-    public PluginManager getPluginManager()
-    {
-        return _pluginManager;
-    }
 }
 
 

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java Thu Aug 14 20:40:49 2008
@@ -1,44 +1,44 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import java.io.IOException;
-
-import org.apache.qpid.server.management.MBeanAttribute;
-
-/**
- * The management interface exposed to allow management of an Exchange.
- * @version 0.1
- */
-public interface ManagedVirtualHost
-{
-    static final String TYPE = "VirtualHost";
-
-    /**
-     * Returns the name of the managed virtualHost.
-     * @return the name of the exchange.
-     * @throws java.io.IOException
-     */
-    @MBeanAttribute(name="Name", description= TYPE + " Name")
-    String getName() throws IOException;
-
-
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.io.IOException;
+
+import org.apache.qpid.server.management.MBeanAttribute;
+
+/**
+ * The management interface exposed to allow management of an Exchange.
+ * @version 0.1
+ */
+public interface ManagedVirtualHost
+{
+    static final String TYPE = "VirtualHost";
+
+    /**
+     * Returns the name of the managed virtualHost.
+     * @return the name of the exchange.
+     * @throws java.io.IOException
+     */
+    @MBeanAttribute(name="Name", description= TYPE + " Name")
+    String getName() throws IOException;
+
+
+}

Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ManagedVirtualHost.java
            ('svn:eol-style' removed)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Thu Aug 14 20:40:49 2008
@@ -26,6 +26,8 @@
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.AMQBrokerManagerMBean;
+import org.apache.qpid.server.connection.ConnectionRegistry;
+import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.security.access.ACLPlugin;
 import org.apache.qpid.server.security.access.ACLManager;
 import org.apache.qpid.server.security.access.Accessable;
@@ -55,6 +57,8 @@
 
     private final String _name;
 
+    private ConnectionRegistry _connectionRegistry;
+
     private QueueRegistry _queueRegistry;
 
     private ExchangeRegistry _exchangeRegistry;
@@ -71,10 +75,11 @@
 
     private ACLPlugin _accessManager;
 
-    private Timer _houseKeepingTimer;
-
+    private final Timer _houseKeepingTimer;
+     
     private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
-    
+
+
     public void setAccessableName(String name)
     {
         _logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
@@ -86,6 +91,10 @@
         return _name;
     }
 
+    public IConnectionRegistry getConnectionRegistry()
+    {
+        return _connectionRegistry;
+    }
 
     /**
      * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
@@ -140,12 +149,18 @@
 
     public VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception
     {
+        if (name == null || name.length() == 0)
+        {
+            throw new IllegalArgumentException("Illegal name (" + name + ") for virtualhost.");
+        }
+
         _name = name;
 
         _virtualHostMBean = new VirtualHostMBean();
-        // This isn't needed to be registered
-        //_virtualHostMBean.register();
 
+        _connectionRegistry = new ConnectionRegistry(this);
+
+        _houseKeepingTimer = new Timer("Queue-housekeeping-"+name, true);
         _queueRegistry = new DefaultQueueRegistry(this);
         _exchangeFactory = new DefaultExchangeFactory(this);
         _exchangeFactory.initialise(hostConfig);
@@ -172,25 +187,22 @@
 
         _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
         _brokerMBean.register();
-
-        _houseKeepingTimer = new Timer("Queue-housekeeping-" + _name, true);
-        
         initialiseHouseKeeping(hostConfig);
     }
 
     private void initialiseHouseKeeping(final Configuration hostConfig)
     {
-
+     
         long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD);
-
+    
         /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
-        if (period != 0L)
+        if(period != 0L)
         {
             class RemoveExpiredMessagesTask extends TimerTask
             {
                 public void run()
                 {
-                    for (AMQQueue q : _queueRegistry.getQueues())
+                    for(AMQQueue q : _queueRegistry.getQueues())
                     {
 
                         try
@@ -199,7 +211,7 @@
                         }
                         catch (AMQException e)
                         {
-                            _logger.error("Exception in housekeeping for queue: " + q.getName().toString(), e);
+                            _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e);
                             throw new RuntimeException(e);
                         }
                     }
@@ -207,11 +219,11 @@
             }
 
             _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
-                                                   period / 2,
-                                                   period);
+                    period/2,
+                    period);
         }
     }
-
+    
     private void initialiseMessageStore(Configuration config) throws Exception
     {
         String messageStoreClass = config.getString("store.class");
@@ -285,14 +297,20 @@
     public ACLPlugin getAccessManager()
     {
         return _accessManager;
-    }
+    }                                                                   
 
     public void close() throws Exception
     {
+        //Stop Housekeeping
         if (_houseKeepingTimer != null)
         {
             _houseKeepingTimer.cancel();
         }
+
+        //Stop Connections
+        _connectionRegistry.close();
+
+        //Close MessageStore
         if (_messageStore != null)
         {
             _messageStore.close();

Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
            ('svn:eol-style' removed)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java Thu Aug 14 20:40:49 2008
@@ -1,70 +1,70 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-public class VirtualHostRegistry
-{
-    private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String,VirtualHost>();
-
-
-    private String _defaultVirtualHostName;
-
-    public synchronized void registerVirtualHost(VirtualHost host) throws Exception
-    {
-        if(_registry.containsKey(host.getName()))
-        {
-            throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
-        }
-        _registry.put(host.getName(),host);
-    }
-
-    public VirtualHost getVirtualHost(String name)
-    {
-        if(name == null || name.trim().length() == 0 )
-        {
-            name = getDefaultVirtualHostName();
-        }
-
-        return _registry.get(name);
-    }
-
-    private String getDefaultVirtualHostName()
-    {
-        return _defaultVirtualHostName;
-    }
-
-    public void setDefaultVirtualHostName(String defaultVirtualHostName)
-    {
-        _defaultVirtualHostName = defaultVirtualHostName;
-    }
-
-
-    public Collection<VirtualHost> getVirtualHosts()
-    {
-        return new ArrayList<VirtualHost>(_registry.values());
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class VirtualHostRegistry
+{
+    private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String,VirtualHost>();
+
+
+    private String _defaultVirtualHostName;
+
+    public synchronized void registerVirtualHost(VirtualHost host) throws Exception
+    {
+        if(_registry.containsKey(host.getName()))
+        {
+            throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
+        }
+        _registry.put(host.getName(),host);
+    }
+
+    public VirtualHost getVirtualHost(String name)
+    {
+        if(name == null || name.trim().length() == 0 )
+        {
+            name = getDefaultVirtualHostName();
+        }
+
+        return _registry.get(name);
+    }
+
+    private String getDefaultVirtualHostName()
+    {
+        return _defaultVirtualHostName;
+    }
+
+    public void setDefaultVirtualHostName(String defaultVirtualHostName)
+    {
+        _defaultVirtualHostName = defaultVirtualHostName;
+    }
+
+
+    public Collection<VirtualHost> getVirtualHosts()
+    {
+        return new ArrayList<VirtualHost>(_registry.values());
+    }
+}

Propchange: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
            ('svn:eol-style' removed)

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/MessageStoreTool.java Thu Aug 14 20:40:49 2008
@@ -25,11 +25,11 @@
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.qpid.configuration.Configuration;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
 import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.tools.messagestore.commands.Clear;
 import org.apache.qpid.tools.messagestore.commands.Command;
 import org.apache.qpid.tools.messagestore.commands.Copy;

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java Thu Aug 14 20:40:49 2008
@@ -20,8 +20,8 @@
  */
 package org.apache.qpid.tools.messagestore.commands;
 
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.server.queue.AMQQueue;
 
 public class Copy extends Move
 {

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java Thu Aug 14 20:40:49 2008
@@ -24,6 +24,7 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntryImpl;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.tools.messagestore.MessageStoreTool;
 import org.apache.qpid.tools.utils.Console;
@@ -255,7 +256,7 @@
                                     String title, boolean routing, boolean headers, boolean messageHeaders)
     {
         List<QueueEntry> single = new LinkedList<QueueEntry>();
-        single.add(new QueueEntry(null,msg));
+        single.add(new QueueEntryImpl(null,msg, Long.MIN_VALUE));
 
         List<List> routingData = super.createMessageData(null, single, headers, routing, messageHeaders);
 

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java Thu Aug 14 20:40:49 2008
@@ -21,7 +21,7 @@
 package org.apache.qpid.tools.messagestore.commands;
 
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntryImpl;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.StoreContext;

Modified: incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java Thu Aug 14 20:40:49 2008
@@ -20,9 +20,8 @@
  */
 package org.apache.qpid.tools.messagestore.commands;
 
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.tools.messagestore.MessageStoreTool;
+import org.apache.qpid.server.queue.AMQQueue;
 
 public class Purge extends Move
 {