You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/10/12 01:22:20 UTC

svn commit: r824198 [6/9] - in /qpid/branches/java-network-refactor: ./ qpid/cpp/ qpid/cpp/bindings/qmf/ qpid/cpp/bindings/qmf/python/ qpid/cpp/bindings/qmf/ruby/ qpid/cpp/bindings/qmf/tests/ qpid/cpp/boost-1.32-support/ qpid/cpp/etc/ qpid/cpp/examples...

Propchange: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 11 23:22:08 2009
@@ -1,3 +1,3 @@
 /qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:757257
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,805429-816233
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,805429-824132

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Sun Oct 11 23:22:08 2009
@@ -160,6 +160,17 @@
     void setMinimumAlertRepeatGap(long value);
 
 
+    long getCapacity();
+
+    void setCapacity(long capacity);
+
+
+    long getFlowResumeCapacity();
+
+    void setFlowResumeCapacity(long flowResumeCapacity);
+
+
+
     void deleteMessageFromTop(StoreContext storeContext) throws AMQException;
 
     long clearQueue(StoreContext storeContext) throws AMQException;
@@ -180,6 +191,8 @@
 
     void stop();
 
+    void checkCapacity(AMQChannel channel);
+
     /**
      * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
      * already exists.

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Sun Oct 11 23:22:08 2009
@@ -26,11 +26,105 @@
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.Map;
+import java.util.HashMap;
+
 
 public class AMQQueueFactory
 {
     public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
 
+    private abstract static class QueueProperty
+    {
+
+        private final AMQShortString _argumentName;
+
+
+        public QueueProperty(String argumentName)
+        {
+            _argumentName = new AMQShortString(argumentName);
+        }
+
+        public AMQShortString getArgumentName()
+        {
+            return _argumentName;
+        }
+
+
+        public abstract void setPropertyValue(AMQQueue queue, Object value);
+
+    }
+
+    private abstract static class QueueLongProperty extends QueueProperty
+    {
+
+        public QueueLongProperty(String argumentName)
+        {
+            super(argumentName);
+        }
+
+        public void setPropertyValue(AMQQueue queue, Object value)
+        {
+            if(value instanceof Number)
+            {
+                setPropertyValue(queue, ((Number)value).longValue());
+            }
+
+        }
+
+        abstract void setPropertyValue(AMQQueue queue, long value);
+
+
+    }
+
+    private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
+            new QueueLongProperty("x-qpid-maximum-message-age")
+            {
+                public void setPropertyValue(AMQQueue queue, long value)
+                {
+                    queue.setMaximumMessageAge(value);
+                }
+            },
+            new QueueLongProperty("x-qpid-maximum-message-size")
+            {
+                public void setPropertyValue(AMQQueue queue, long value)
+                {
+                    queue.setMaximumMessageSize(value);
+                }
+            },
+            new QueueLongProperty("x-qpid-maximum-message-count")
+            {
+                public void setPropertyValue(AMQQueue queue, long value)
+                {
+                    queue.setMaximumMessageCount(value);
+                }
+            },
+            new QueueLongProperty("x-qpid-minimum-alert-repeat-gap")
+            {
+                public void setPropertyValue(AMQQueue queue, long value)
+                {
+                    queue.setMinimumAlertRepeatGap(value);
+                }
+            },
+            new QueueLongProperty("x-qpid-capacity")
+            {
+                public void setPropertyValue(AMQQueue queue, long value)
+                {
+                    queue.setCapacity(value);
+                }
+            },
+            new QueueLongProperty("x-qpid-flow-resume-capacity")
+            {
+                public void setPropertyValue(AMQQueue queue, long value)
+                {
+                    queue.setFlowResumeCapacity(value);
+                }
+            }
+
+    };
+
+
+
     public static AMQQueue createAMQQueueImpl(AMQShortString name,
                                               boolean durable,
                                               AMQShortString owner,
@@ -53,6 +147,18 @@
         //Register the new queue
         virtualHost.getQueueRegistry().registerQueue(q);
         q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString()));
+
+        if(arguments != null)
+        {
+            for(QueueProperty p : DECLAREABLE_PROPERTIES)
+            {
+                if(arguments.containsKey(p.getArgumentName()))
+                {
+                    p.setPropertyValue(q, arguments.get(p.getArgumentName()));
+                }
+            }
+        }
+
         return q;
     }
 

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sun Oct 11 23:22:08 2009
@@ -42,8 +42,7 @@
 
     private final SimpleQueueEntryList _queueEntryList;
 
-    private AMQMessage _message;
-
+    private final AMQMessage _message;
 
     private Set<Subscription> _rejectedBy = null;
 
@@ -177,13 +176,21 @@
 
     public String debugIdentity()
     {
-        return getMessage().debugIdentity();
+        AMQMessage message = getMessage();
+        if (message == null)
+        {
+            return "null";
+        }
+        else
+        {
+            return message.debugIdentity();
+        }
     }
 
 
     public boolean immediateAndNotDelivered() 
     {
-        return _message.immediateAndNotDelivered();
+        return getMessage().immediateAndNotDelivered();
     }
 
     public void setRedelivered(boolean b)
@@ -385,4 +392,5 @@
     {
         return _queueEntryList;
     }
+
 }

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Sun Oct 11 23:22:08 2009
@@ -1,11 +1,10 @@
 package org.apache.qpid.server.queue;
 
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +34,7 @@
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.AMQChannel;
 
 /*
 *
@@ -96,6 +96,8 @@
     private final Executor _asyncDelivery;
     private final AtomicLong _totalMessagesReceived = new AtomicLong();
 
+    private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
+
     /** max allowed size(KB) of a single message */
     public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
 
@@ -122,6 +124,11 @@
     private LogSubject _logSubject;
     private LogActor _logActor;
 
+
+    private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
+    private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
+    private final AtomicBoolean _overfull = new AtomicBoolean(false);
+
     protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
             throws AMQException
     {
@@ -508,8 +515,11 @@
             throws AMQException
     {
         _deliveredMessages.incrementAndGet();
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug(sub + ": deliverMessage: " + entry.debugIdentity());
+        }
         sub.send(entry);
-
     }
 
     private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
@@ -626,6 +636,8 @@
             throw new FailedDequeueException(_name.toString(), e);
         }
 
+        checkCapacity();
+
     }
 
     private void decrementQueueSize(final QueueEntry entry)
@@ -1170,11 +1182,64 @@
         }
     }
 
-    public void deliverAsync()
+    public void checkCapacity(AMQChannel channel)
     {
-        _stateChangeCount.incrementAndGet();
+        if(_capacity != 0l)
+        {
+            if(_atomicQueueSize.get() > _capacity)
+            {
+                _overfull.set(true);
+                //Overfull log message
+                _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity));
+
+                if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null)
+                {
+                    channel.block(this);
+                }
+
+                if(_atomicQueueSize.get() <= _flowResumeCapacity)
+                {
+
+                    //Underfull log message
+                    _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+
+                   channel.unblock(this);
+                   _blockedChannels.remove(channel);
+
+                }
+
+            }
+
+
+
+        }
+    }
+
+    private void checkCapacity()
+    {
+        if(_capacity != 0L)
+        {
+            if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity)
+            {
+                if(_overfull.compareAndSet(true,false))
+                {//Underfull log message
+                    _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+                }
 
-        Runner runner = new Runner();
+
+                for(AMQChannel c : _blockedChannels.keySet())
+                {
+                    c.unblock(this);
+                    _blockedChannels.remove(c);
+                }
+            }
+        }
+    }
+
+
+    public void deliverAsync()
+    {
+        Runner runner = new Runner(_stateChangeCount.incrementAndGet());
 
         if (_asynchronousRunner.compareAndSet(null, runner))
         {
@@ -1187,13 +1252,23 @@
         _asyncDelivery.execute(new SubFlushRunner(sub));
     }
 
+
     private class Runner implements ReadWriteRunnable
     {
+        String _name;
+        public Runner(long count)
+        {
+            _name = "QueueRunner-" + count + "-" + _logActor;
+        }
+
         public void run()
         {
+            String originalName = Thread.currentThread().getName();
             try
             {
+                Thread.currentThread().setName(_name);
                 CurrentActor.set(_logActor);
+
                 processQueue(this);
             }
             catch (AMQException e)
@@ -1203,9 +1278,8 @@
             finally
             {
                 CurrentActor.remove();
+                Thread.currentThread().setName(originalName);
             }
-
-
         }
 
         public boolean isRead()
@@ -1217,6 +1291,11 @@
         {
             return true;
         }
+
+        public String toString()
+        {
+            return _name;
+        }
     }
 
     private class SubFlushRunner implements ReadWriteRunnable
@@ -1230,27 +1309,36 @@
 
         public void run()
         {
-            boolean complete = false;
-            try
-            {
-                CurrentActor.set(_sub.getLogActor());
-                complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
 
-            }
-            catch (AMQException e)
-            {
-                _logger.error(e);
+            String originalName = Thread.currentThread().getName();
+            try{
+                Thread.currentThread().setName("SubFlushRunner-"+_sub);
+
+                boolean complete = false;
+                try
+                {
+                    CurrentActor.set(_sub.getLogActor());
+                    complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
+
+                }
+                catch (AMQException e)
+                {
+                    _logger.error(e);
+                }
+                finally
+                {
+                    CurrentActor.remove();
+                }
+                if (!complete && !_sub.isSuspended())
+                {
+                    _asyncDelivery.execute(this);
+                }
             }
             finally
             {
-                CurrentActor.remove();
-            }
-            if (!complete && !_sub.isSuspended())
-            {
-                _asyncDelivery.execute(this);
+                Thread.currentThread().setName(originalName);
             }
 
-
         }
 
         public boolean isRead()
@@ -1278,7 +1366,7 @@
             try
             {
                 sub.getSendLock();
-                atTail =  attemptDelivery(sub);
+                atTail = attemptDelivery(sub);
                 if (atTail && sub.isAutoClose())
                 {
                     unregisterSubscription(sub);
@@ -1308,63 +1396,81 @@
         return atTail;
     }
 
+    /**
+     * Attempt delivery for the given subscription.
+     *
+     * Looks up the next node for the subscription and attempts to deliver it.
+     *
+     * @param sub
+     * @return true if we have completed all possible deliveries for this sub.
+     * @throws AMQException
+     */
     private boolean attemptDelivery(Subscription sub) throws AMQException
     {
         boolean atTail = false;
         boolean advanced = false;
-        boolean subActive = sub.isActive();
+        boolean subActive = sub.isActive() && !sub.isSuspended();
         if (subActive)
         {
             QueueEntry node = moveSubscriptionToNextNode(sub);
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug(sub + ": attempting Delivery: " + node.debugIdentity());
+            }
             if (!(node.isAcquired() || node.isDeleted()))
             {
-                if (!sub.isSuspended())
+                if (sub.hasInterest(node))
                 {
-                    if (sub.hasInterest(node))
+                    if (!sub.wouldSuspend(node))
                     {
-                        if (!sub.wouldSuspend(node))
+                        if (!sub.isBrowser() && !node.acquire(sub))
                         {
-                            if (!sub.isBrowser() && !node.acquire(sub))
-                            {
-                                sub.restoreCredit(node);
-                            }
-                            else
+                            sub.restoreCredit(node);
+                        }
+                        else
+                        {
+                            deliverMessage(sub, node);
+
+                            if (sub.isBrowser())
                             {
-                                deliverMessage(sub, node);
+                                QueueEntry newNode = _entries.next(node);
 
-                                if (sub.isBrowser())
+                                if (newNode != null)
                                 {
-                                    QueueEntry newNode = _entries.next(node);
-
-                                    if (newNode != null)
-                                    {
-                                        advanced = true;
-                                        sub.setLastSeenEntry(node, newNode);
-                                        node = sub.getLastSeenEntry();
-                                    }
+                                    advanced = true;
+                                    sub.setLastSeenEntry(node, newNode);
+                                    node = sub.getLastSeenEntry();
                                 }
+                                
                             }
-
-                        }
-                        else // Not enough Credit for message and wouldSuspend
-                        {
-                            //QPID-1187 - Treat the subscription as suspended for this message
-                            // and wait for the message to be removed to continue delivery.
-                            subActive = false;
-                            node.addStateChangeListener(new QueueEntryListener(sub, node));
                         }
+
                     }
-                    else
+                    else // Not enough Credit for message and wouldSuspend
                     {
-                        // this subscription is not interested in this node so we can skip over it
-                        QueueEntry newNode = _entries.next(node);
-                        if (newNode != null)
-                        {
-                            sub.setLastSeenEntry(node, newNode);
-                        }
+                        //QPID-1187 - Treat the subscription as suspended for this message
+                        // and wait for the message to be removed to continue delivery.
+                        
+                        // 2009-09-30 : MR : setting subActive = false only causes, this
+                        // particular delivery attempt to end. This is called from
+                        // flushSubscription and processQueue both of which attempt
+                        // delivery a number of times. Won't a bytes limited
+                        // subscriber with not enough credit for the next message
+                        // create a lot of new QELs? How about a browser that calls
+                        // this method LONG.MAX_LONG times! 
+                        subActive = false;
+                        node.addStateChangeListener(new QueueEntryListener(sub, node));
+                    }
+                }
+                else
+                {
+                    // this subscription is not interested in this node so we can skip over it
+                    QueueEntry newNode = _entries.next(node);
+                    if (newNode != null)
+                    {
+                        sub.setLastSeenEntry(node, newNode);
                     }
                 }
-
             }
             atTail = (_entries.next(node) == null) && !advanced;
         }
@@ -1409,6 +1515,12 @@
             }
 
         }
+
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug(sub + ": nextNode: " + (node == null ? "null" : node.debugIdentity()));
+        }
+
         return node;
     }
 
@@ -1423,6 +1535,11 @@
 
         _asynchronousRunner.compareAndSet(runner, null);
 
+        // For every message enqueue/requeue the we fire deliveryAsync() which
+        // increases _stateChangeCount. If _sCC changes whilst we are in our loop
+        // (detected by setting previousStateChangeCount to stateChangeCount in the loop body)
+        // then we will continue to run for a maximum of iterations.
+        // So whilst delivery/rejection is going on a processQueue thread will be running
         while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner))
         {
             // we want to have one extra loop after every subscription has reached the point where it cannot move
@@ -1442,20 +1559,11 @@
             //iterate over the subscribers and try to advance their pointer
             while (subscriptionIter.advance())
             {
-                boolean closeConsumer = false;
                 Subscription sub = subscriptionIter.getNode().getSubscription();
                 sub.getSendLock();
                 try
                 {
-                    if (sub != null)
-                    {
-
-                        QueueEntry node = moveSubscriptionToNextNode(sub);
-                        if (node != null)
-                        {
-                            done = attemptDelivery(sub);
-                        }
-                    }
+                    done = attemptDelivery(sub);
                     if (done)
                     {
                         if (extraLoops == 0)
@@ -1492,11 +1600,14 @@
         // therefore we should schedule this runner again (unless someone beats us to it :-) ).
         if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
         {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Rescheduling runner:" + runner);
+            }
             _asyncDelivery.execute(runner);
         }
     }
 
-    @Override
     public void checkMessageStatus() throws AMQException
     {
 
@@ -1604,6 +1715,27 @@
         }
     }
 
+    public long getCapacity()
+    {
+        return _capacity;
+    }
+
+    public void setCapacity(long capacity)
+    {
+        _capacity = capacity;
+    }
+
+    public long getFlowResumeCapacity()
+    {
+        return _flowResumeCapacity;
+    }
+
+    public void setFlowResumeCapacity(long flowResumeCapacity)
+    {
+        _flowResumeCapacity = flowResumeCapacity;
+    }
+
+
     public Set<NotificationCheck> getNotificationChecks()
     {
         return _notificationChecks;
@@ -1673,6 +1805,8 @@
             setMaximumMessageSize(config.getMaximumMessageSize());
             setMaximumMessageCount(config.getMaximumMessageCount());
             setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
+            _capacity = config.getCapacity();
+            _flowResumeCapacity = config.getFlowResumeCapacity();
         }
     }
 }

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Sun Oct 11 23:22:08 2009
@@ -102,7 +102,7 @@
 
             try
             {
-                instance.initialise();
+                instance.initialise(instanceID);
             }
             catch (Exception e)
             {

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Sun Oct 11 23:22:08 2009
@@ -42,18 +42,22 @@
 
 public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
 {
+    private String _registryName;
 
     public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
     {
         super(new ServerConfiguration(configurationURL));
     }
 
-    public void initialise() throws Exception
+    public void initialise(int instanceID) throws Exception
     {
         _rootMessageLogger = new RootMessageLoggerImpl(_configuration, 
                                                        new Log4jMessageLogger());
+        
+        _registryName = String.valueOf(instanceID);
+
         // Set the Actor for current log messages
-        CurrentActor.set(new BrokerActor(_rootMessageLogger));
+        CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger));
 
         CurrentActor.get().message(BrokerMessages.BRK_1001(QpidProperties.getReleaseVersion(),QpidProperties.getBuildVersion()));        
 
@@ -83,7 +87,7 @@
     public void close() throws Exception
     {
         //Set the Actor for Broker Shutdown
-        CurrentActor.set(new BrokerActor(_rootMessageLogger));
+        CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger));
         try
         {
             super.close();

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Sun Oct 11 23:22:08 2009
@@ -43,8 +43,9 @@
      * Initialise the application registry. All initialisation must be done in this method so that any components
      * that need access to the application registry itself for initialisation are able to use it. Attempting to
      * initialise in the constructor will lead to failures since the registry reference will not have been set.
+     * @param instanceID the instanceID that we can use to identify this AR.
      */
-    void initialise() throws Exception;
+    void initialise(int instanceID) throws Exception;
 
     /**
      * Shutdown this Registry

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java Sun Oct 11 23:22:08 2009
@@ -20,16 +20,17 @@
  */
 package org.apache.qpid.server.security.access;
 
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
-import org.apache.qpid.server.exchange.Exchange;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 
 public class PrincipalPermissions
 {
@@ -41,6 +42,7 @@
     private static final Object CREATE_QUEUES_KEY = new Object();
     private static final Object CREATE_EXCHANGES_KEY = new Object();
 
+    
     private static final Object CREATE_QUEUE_TEMPORARY_KEY = new Object();
     private static final Object CREATE_QUEUE_QUEUES_KEY = new Object();
     private static final Object CREATE_QUEUE_EXCHANGES_KEY = new Object();
@@ -80,248 +82,257 @@
     {
         switch (permission)
         {
-            case ACCESS:
-                break; // This is a no-op as the existence of this PrincipalPermission object is scoped per VHost for ACCESS
-            case BIND:
-                break; // All the details are currently included in the create setup.
             case CONSUME: // Parameters : AMQShortString queueName, Boolean Temporary, Boolean ownQueueOnly
-                Map consumeRights = (Map) _permissions.get(permission);
-
-                if (consumeRights == null)
-                {
-                    consumeRights = new ConcurrentHashMap();
-                    _permissions.put(permission, consumeRights);
-                }
-
-                //if we have parametsre
-                if (parameters.length > 0)
-                {
-                    AMQShortString queueName = (AMQShortString) parameters[0];
-                    Boolean temporary = (Boolean) parameters[1];
-                    Boolean ownQueueOnly = (Boolean) parameters[2];
-
-                    if (temporary)
-                    {
-                        consumeRights.put(CONSUME_TEMPORARY_KEY, true);
-                    }
-                    else
-                    {
-                        consumeRights.put(CONSUME_TEMPORARY_KEY, false);
-                    }
-
-                    if (ownQueueOnly)
-                    {
-                        consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, true);
-                    }
-                    else
-                    {
-                        consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, false);
-                    }
-
-
-                    LinkedList queues = (LinkedList) consumeRights.get(CONSUME_QUEUES_KEY);
-                    if (queues == null)
-                    {
-                        queues = new LinkedList();
-                        consumeRights.put(CONSUME_QUEUES_KEY, queues);
-                    }
-
-                    if (queueName != null)
-                    {
-                        queues.add(queueName);
-                    }
-                }
-
-
+                grantConsume(permission, parameters);
                 break;
             case CREATEQUEUE:  // Parameters : Boolean temporary, AMQShortString queueName
                 // , AMQShortString exchangeName , AMQShortString routingKey
-
-                Map createRights = (Map) _permissions.get(permission);
-
-                if (createRights == null)
-                {
-                    createRights = new ConcurrentHashMap();
-                    _permissions.put(permission, createRights);
-
-                }
-
-                //The existence of the empty map mean permission to all.
-                if (parameters.length == 0)
-                {
-                    return;
-                }
-
-                Boolean temporary = (Boolean) parameters[0];
-
-                AMQShortString queueName = parameters.length > 1 ? (AMQShortString) parameters[1] : null;
-                AMQShortString exchangeName = parameters.length > 2 ? (AMQShortString) parameters[2] : null;
-                //Set the routingkey to the specified value or the queueName if present
-                AMQShortString routingKey = (parameters.length > 3 && null != parameters[3]) ? (AMQShortString) parameters[3] : queueName;
-
-                // Get the queues map
-                Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY);
-
-                if (create_queues == null)
-                {
-                    create_queues = new ConcurrentHashMap();
-                    createRights.put(CREATE_QUEUES_KEY, create_queues);
-                }
-
-                //Allow all temp queues to be created
-                create_queues.put(CREATE_QUEUE_TEMPORARY_KEY, temporary);
-
-                //Create empty list of queues
-                Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY);
-
-                if (create_queues_queues == null)
-                {
-                    create_queues_queues = new ConcurrentHashMap();
-                    create_queues.put(CREATE_QUEUE_QUEUES_KEY, create_queues_queues);
-                }
-
-                // We are granting CREATE rights to all temporary queues only
-                if (parameters.length == 1)
-                {
-                    return;
-                }
-
-                // if we have a queueName then we need to store any associated exchange / rk bindings
-                if (queueName != null)
-                {
-                    Map queue = (Map) create_queues_queues.get(queueName);
-                    if (queue == null)
-                    {
-                        queue = new ConcurrentHashMap();
-                        create_queues_queues.put(queueName, queue);
-                    }
-
-                    if (exchangeName != null)
-                    {
-                        queue.put(exchangeName, routingKey);
-                    }
-
-                    //If no exchange is specified then the presence of the queueName in the map says any exchange is ok
-                }
-
-                // Store the exchange that we are being granted rights to. This will be used as part of binding
-
-                //Lookup the list of exchanges
-                Map create_queues_exchanges = (Map) create_queues.get(CREATE_QUEUE_EXCHANGES_KEY);
-
-                if (create_queues_exchanges == null)
-                {
-                    create_queues_exchanges = new ConcurrentHashMap();
-                    create_queues.put(CREATE_QUEUE_EXCHANGES_KEY, create_queues_exchanges);
-                }
-
-                //if we have an exchange
-                if (exchangeName != null)
-                {
-                    //Retrieve the list of permitted exchanges.
-                    Map exchanges = (Map) create_queues_exchanges.get(exchangeName);
-
-                    if (exchanges == null)
-                    {
-                        exchanges = new ConcurrentHashMap();
-                        create_queues_exchanges.put(exchangeName, exchanges);
-                    }
-
-                    //Store the temporary setting CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY
-                    exchanges.put(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY, temporary);
-
-                    //Store the binding details of queue/rk for this exchange.
-                    if (queueName != null)
-                    {
-                        //Retrieve the list of permitted routingKeys.
-                        Map rKeys = (Map) exchanges.get(exchangeName);
-
-                        if (rKeys == null)
-                        {
-                            rKeys = new ConcurrentHashMap();
-                            exchanges.put(CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY, rKeys);
-                        }
-
-                        rKeys.put(queueName, routingKey);
-                    }
-                }
+                grantCreateQueue(permission, parameters);
                 break;
             case CREATEEXCHANGE:
                 // Parameters AMQShortString exchangeName , AMQShortString Class
-                Map rights = (Map) _permissions.get(permission);
-                if (rights == null)
-                {
-                    rights = new ConcurrentHashMap();
-                    _permissions.put(permission, rights);
-                }
-
-                Map create_exchanges = (Map) rights.get(CREATE_EXCHANGES_KEY);
-                if (create_exchanges == null)
-                {
-                    create_exchanges = new ConcurrentHashMap();
-                    rights.put(CREATE_EXCHANGES_KEY, create_exchanges);
-                }
-
-                //Should perhaps error if parameters[0] is null;
-                AMQShortString name = parameters.length > 0 ? (AMQShortString) parameters[0] : null;
-                AMQShortString className = parameters.length > 1 ? (AMQShortString) parameters[1] : new AMQShortString("direct");
-
-                //Store the exchangeName / class mapping if the mapping is null
-                rights.put(name, className);
-                break;
-            case DELETE:
+                grantCreateExchange(permission, parameters);
                 break;
-
             case PUBLISH: // Parameters : Exchange exchange, AMQShortString routingKey
-                Map publishRights = (Map) _permissions.get(permission);
-
-                if (publishRights == null)
-                {
-                    publishRights = new ConcurrentHashMap();
-                    _permissions.put(permission, publishRights);
-                }
-
-                if (parameters == null || parameters.length == 0)
-                {
-                    //If we have no parameters then allow publish to all destinations
-                    // this is signified by having a null value for publish_exchanges
-                }
-                else
-                {
-                    Map publish_exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY);
-
-                    if (publish_exchanges == null)
-                    {
-                        publish_exchanges = new ConcurrentHashMap();
-                        publishRights.put(PUBLISH_EXCHANGES_KEY, publish_exchanges);
-                    }
-
-
-                    HashSet routingKeys = (HashSet) publish_exchanges.get(parameters[0]);
-
-                    // Check to see if we have a routing key
-                    if (parameters.length == 2)
-                    {
-                        if (routingKeys == null)
-                        {
-                            routingKeys = new HashSet<AMQShortString>();
-                        }
-                        //Add routing key to permitted publish destinations
-                        routingKeys.add(parameters[1]);
-                    }
-
-                    // Add the updated routingkey list or null if all values allowed
-                    publish_exchanges.put(parameters[0], routingKeys);
-                }
+                grantPublish(permission, parameters);
                 break;
+            /* The other cases just fall through to no-op */
+            case DELETE:
+            case ACCESS: // This is a no-op as the existence of this PrincipalPermission object is scoped per VHost for ACCESS
+            case BIND: // All the details are currently included in the create setup.
             case PURGE:
-                break;
             case UNBIND:
                 break;
         }
 
     }
 
+	private void grantPublish(Permission permission, Object... parameters) {
+		Map publishRights = (Map) _permissions.get(permission);
+
+		if (publishRights == null)
+		{
+		    publishRights = new ConcurrentHashMap();
+		    _permissions.put(permission, publishRights);
+		}
+
+		if (parameters == null || parameters.length == 0)
+		{
+		    //If we have no parameters then allow publish to all destinations
+		    // this is signified by having a null value for publish_exchanges
+		}
+		else
+		{
+		    Map publish_exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY);
+
+		    if (publish_exchanges == null)
+		    {
+		        publish_exchanges = new ConcurrentHashMap();
+		        publishRights.put(PUBLISH_EXCHANGES_KEY, publish_exchanges);
+		    }
+
+
+		    HashSet routingKeys = (HashSet) publish_exchanges.get(parameters[0]);
+
+		    // Check to see if we have a routing key
+		    if (parameters.length == 2)
+		    {
+		        if (routingKeys == null)
+		        {
+		            routingKeys = new HashSet<AMQShortString>();
+		        }
+		        //Add routing key to permitted publish destinations
+		        routingKeys.add(parameters[1]);
+		    }
+
+		    // Add the updated routingkey list or null if all values allowed
+		    publish_exchanges.put(parameters[0], routingKeys);
+		}
+	}
+
+	private void grantCreateExchange(Permission permission, Object... parameters) {
+		Map rights = (Map) _permissions.get(permission);
+		if (rights == null)
+		{
+		    rights = new ConcurrentHashMap();
+		    _permissions.put(permission, rights);
+		}
+
+		Map create_exchanges = (Map) rights.get(CREATE_EXCHANGES_KEY);
+		if (create_exchanges == null)
+		{
+		    create_exchanges = new ConcurrentHashMap();
+		    rights.put(CREATE_EXCHANGES_KEY, create_exchanges);
+		}
+
+		//Should perhaps error if parameters[0] is null;
+		AMQShortString name = parameters.length > 0 ? (AMQShortString) parameters[0] : null;
+		AMQShortString className = parameters.length > 1 ? (AMQShortString) parameters[1] : new AMQShortString("direct");
+
+		//Store the exchangeName / class mapping if the mapping is null
+		rights.put(name, className);
+	}
+
+	private void grantCreateQueue(Permission permission, Object... parameters) {
+		Map createRights = (Map) _permissions.get(permission);
+
+		if (createRights == null)
+		{
+		    createRights = new ConcurrentHashMap();
+		    _permissions.put(permission, createRights);
+
+		}
+
+		//The existence of the empty map mean permission to all.
+		if (parameters.length == 0)
+		{
+		    return;
+		}
+
+		Boolean temporary = (Boolean) parameters[0];
+
+		AMQShortString queueName = parameters.length > 1 ? (AMQShortString) parameters[1] : null;
+		AMQShortString exchangeName = parameters.length > 2 ? (AMQShortString) parameters[2] : null;
+		//Set the routingkey to the specified value or the queueName if present
+		AMQShortString routingKey = (parameters.length > 3 && null != parameters[3]) ? (AMQShortString) parameters[3] : queueName;
+
+		// Get the queues map
+		Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY);
+
+		if (create_queues == null)
+		{
+		    create_queues = new ConcurrentHashMap();
+		    createRights.put(CREATE_QUEUES_KEY, create_queues);
+		}
+
+		//Allow all temp queues to be created
+		create_queues.put(CREATE_QUEUE_TEMPORARY_KEY, temporary);
+
+		//Create empty list of queues
+		Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY);
+
+		if (create_queues_queues == null)
+		{
+		    create_queues_queues = new ConcurrentHashMap();
+		    create_queues.put(CREATE_QUEUE_QUEUES_KEY, create_queues_queues);
+		}
+
+		// We are granting CREATE rights to all temporary queues only
+		if (parameters.length == 1)
+		{
+		    return;
+		}
+
+		// if we have a queueName then we need to store any associated exchange / rk bindings
+		if (queueName != null)
+		{
+		    Map queue = (Map) create_queues_queues.get(queueName);
+		    if (queue == null)
+		    {
+		        queue = new ConcurrentHashMap();
+		        create_queues_queues.put(queueName, queue);
+		    }
+
+		    if (exchangeName != null)
+		    {
+		        queue.put(exchangeName, routingKey);
+		    }
+
+		    //If no exchange is specified then the presence of the queueName in the map says any exchange is ok
+		}
+
+		// Store the exchange that we are being granted rights to. This will be used as part of binding
+
+		//Lookup the list of exchanges
+		Map create_queues_exchanges = (Map) create_queues.get(CREATE_QUEUE_EXCHANGES_KEY);
+
+		if (create_queues_exchanges == null)
+		{
+		    create_queues_exchanges = new ConcurrentHashMap();
+		    create_queues.put(CREATE_QUEUE_EXCHANGES_KEY, create_queues_exchanges);
+		}
+
+		//if we have an exchange
+		if (exchangeName != null)
+		{
+		    //Retrieve the list of permitted exchanges.
+		    Map exchanges = (Map) create_queues_exchanges.get(exchangeName);
+
+		    if (exchanges == null)
+		    {
+		        exchanges = new ConcurrentHashMap();
+		        create_queues_exchanges.put(exchangeName, exchanges);
+		    }
+
+		    //Store the temporary setting CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY
+		    exchanges.put(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY, temporary);
+
+		    //Store the binding details of queue/rk for this exchange.
+		    if (queueName != null)
+		    {
+		        //Retrieve the list of permitted routingKeys.
+		        Map rKeys = (Map) exchanges.get(exchangeName);
+
+		        if (rKeys == null)
+		        {
+		            rKeys = new ConcurrentHashMap();
+		            exchanges.put(CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY, rKeys);
+		        }
+
+		        rKeys.put(queueName, routingKey);
+		    }
+		}
+	}
+
+	private void grantConsume(Permission permission, Object... parameters) {
+		Map consumeRights = (Map) _permissions.get(permission);
+
+		if (consumeRights == null)
+		{
+		    consumeRights = new ConcurrentHashMap();
+		    _permissions.put(permission, consumeRights);
+		}
+
+		//if we have parametsre
+		if (parameters.length > 0)
+		{
+		    AMQShortString queueName = (AMQShortString) parameters[0];
+		    Boolean temporary = (Boolean) parameters[1];
+		    Boolean ownQueueOnly = (Boolean) parameters[2];
+
+		    if (temporary)
+		    {
+		        consumeRights.put(CONSUME_TEMPORARY_KEY, true);
+		    }
+		    else
+		    {
+		        consumeRights.put(CONSUME_TEMPORARY_KEY, false);
+		    }
+
+		    if (ownQueueOnly)
+		    {
+		        consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, true);
+		    }
+		    else
+		    {
+		        consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, false);
+		    }
+
+
+		    LinkedList queues = (LinkedList) consumeRights.get(CONSUME_QUEUES_KEY);
+		    if (queues == null)
+		    {
+		        queues = new LinkedList();
+		        consumeRights.put(CONSUME_QUEUES_KEY, queues);
+		    }
+
+		    if (queueName != null)
+		    {
+		        queues.add(queueName);
+		    }
+		}
+	}
+
     /**
      * 
      * @param permission the type of permission to check
@@ -346,267 +357,286 @@
                 return AuthzResult.ALLOWED; // This is here for completeness but the SimpleXML ACLManager never calls it.
                 // The existence of this user specific PP can be validated in the map SimpleXML maintains.
             case BIND: // Parameters : QueueBindMethod , Exchange , AMQQueue, AMQShortString routingKey
-
-                Exchange exchange = (Exchange) parameters[1];
-
-                AMQQueue bind_queueName = (AMQQueue) parameters[2];
-                AMQShortString routingKey = (AMQShortString) parameters[3];
-
-                //Get all Create Rights for this user
-                Map bindCreateRights = (Map) _permissions.get(Permission.CREATEQUEUE);
-
-                //Look up the Queue Creation Rights
-                Map bind_create_queues = (Map) bindCreateRights.get(CREATE_QUEUES_KEY);
-
-                //Lookup the list of queues
-                Map bind_create_queues_queues = (Map) bindCreateRights.get(CREATE_QUEUE_QUEUES_KEY);
-
-                // Check and see if we have a queue white list to check
-                if (bind_create_queues_queues != null)
-                {
-                    //There a white list for queues
-                    Map exchangeDetails = (Map) bind_create_queues_queues.get(bind_queueName);
-
-                    if (exchangeDetails == null) //Then all queue can be bound to all exchanges.
-                    {
-                        return AuthzResult.ALLOWED;
-                    }
-
-                    // Check to see if we have a white list of routingkeys to check
-                    Map rkeys = (Map) exchangeDetails.get(exchange.getName());
-
-                    // if keys is null then any rkey is allowed on this exchange
-                    if (rkeys == null)
-                    {
-                        // There is no routingkey white list
-                        return AuthzResult.ALLOWED;
-                    }
-                    else
-                    {
-                        // We have routingKeys so a match must be found to allowed binding
-                        Iterator keys = rkeys.keySet().iterator();
-
-                        boolean matched = false;
-                        while (keys.hasNext() && !matched)
-                        {
-                            AMQShortString rkey = (AMQShortString) keys.next();
-                            if (rkey.endsWith("*"))
-                            {
-                                matched = routingKey.startsWith(rkey.subSequence(0, rkey.length() - 1).toString());
-                            }
-                            else
-                            {
-                                matched = routingKey.equals(rkey);
-                            }
-                        }
-
-
-                        return (matched) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
-                    }
-
-
-                }
-                else
-                {
-                    //There a is no white list for queues
-
-                    // So can allow all queues to be bound
-                    //  but we should first check and see if we have a temp queue and validate that we are allowed
-                    //  to bind temp queues.
-
-                    //Check to see if we have a temporary queue
-                    if (bind_queueName.isAutoDelete())
-                    {
-                        // Check and see if we have an exchange white list.
-                        Map bind_exchanges = (Map) bind_create_queues.get(CREATE_QUEUE_EXCHANGES_KEY);
-
-                        // If the exchange exists then we must check to see if temporary queues are allowed here
-                        if (bind_exchanges != null)
-                        {
-                            // Check to see if the requested exchange is allowed.
-                            Map exchangeDetails = (Map) bind_exchanges.get(exchange.getName());
-
-                            return ((Boolean) exchangeDetails.get(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
-                        }
-
-                        //no white list so all allowed, drop through to return true below.
-                    }
-
-                    // not a temporary queue and no white list so all allowed.
-                    return AuthzResult.ALLOWED;
-                }
-
+                return authoriseBind(parameters);
             case CREATEQUEUE:// Parameters : boolean autodelete, AMQShortString name
-
-                Map createRights = (Map) _permissions.get(permission);
-
-                // If there are no create rights then deny request
-                if (createRights == null)
-                {
-                    return AuthzResult.DENIED;
-                }
-
-                //Look up the Queue Creation Rights
-                Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY);
-
-                //Lookup the list of queues allowed to be created
-                Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY);
-
-
-                AMQShortString queueName = (AMQShortString) parameters[1];
-                Boolean autoDelete = (Boolean) parameters[0];
-
-                if (autoDelete)// we have a temporary queue
-                {
-                    return ((Boolean) create_queues.get(CREATE_QUEUE_TEMPORARY_KEY)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
-                }
-                else
-                {
-                    // If there is a white list then check
-                    if (create_queues_queues == null || create_queues_queues.containsKey(queueName))
-                    {
-                        return AuthzResult.ALLOWED; 
-                    }
-                    else
-                    {
-                        return AuthzResult.DENIED;
-                    }
-                        
-                }
+                return authoriseCreateQueue(permission, parameters);
             case CREATEEXCHANGE:
-                Map rights = (Map) _permissions.get(permission);
-
-                AMQShortString exchangeName = (AMQShortString) parameters[0];
-
-                // If the exchange list is doesn't exist then all is allowed else
-                // check the valid exchanges
-                if (rights == null || rights.containsKey(exchangeName))
-                {
-                    return AuthzResult.ALLOWED; 
-                }
-                else
-                {
-                    return AuthzResult.DENIED;
-                }
+                return authoriseCreateExchange(permission, parameters);
             case CONSUME: // Parameters :  AMQQueue
-
-                if (parameters.length == 1 && parameters[0] instanceof AMQQueue)
-                {
-                    AMQQueue queue = ((AMQQueue) parameters[0]);
-                    Map queuePermissions = (Map) _permissions.get(permission);
-
-                    List queues = (List) queuePermissions.get(CONSUME_QUEUES_KEY);
-
-                    Boolean temporayQueues = (Boolean) queuePermissions.get(CONSUME_TEMPORARY_KEY);
-                    Boolean ownQueuesOnly = (Boolean) queuePermissions.get(CONSUME_OWN_QUEUES_ONLY_KEY);
-
-                    // If user is allowed to publish to temporary queues and this is a temp queue then allow it.
-                    if (temporayQueues)
-                    {
-                        if (queue.isAutoDelete())
-                        // This will allow consumption from any temporary queue including ones not owned by this user.
-                        // Of course the exclusivity will not be broken.
-                        {
-                            // if not limited to ownQueuesOnly then ok else check queue Owner.
-                            return (!ownQueuesOnly || queue.getOwner().equals(_user)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
-                        }
-                        else
-                        {
-                            return AuthzResult.DENIED;
-                        }
-                    }
-
-                    // if queues are white listed then ensure it is ok
-                    if (queues != null)
-                    {
-                        // if no queues are listed then ALL are ok othereise it must be specified.
-                        if (ownQueuesOnly)
-                        {
-                            if (queue.getOwner().equals(_user))
-                            {
-                                return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
-                            }
-                            else
-                            {
-                                return AuthzResult.DENIED;
-                            }
-                        }
-
-                        // If we are
-                        return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
-                    }
-                }
-
-                // Can't authenticate without the right parameters
-                return AuthzResult.DENIED;
-            case DELETE:
-                break;
-
+                return authoriseConsume(permission, parameters);
             case PUBLISH: // Parameters : Exchange exchange, AMQShortString routingKey
-                Map publishRights = (Map) _permissions.get(permission);
-
-                if (publishRights == null)
-                {
-                    return AuthzResult.DENIED;
-                }
-
-                Map exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY);
-
-                // Having no exchanges listed gives full publish rights to all exchanges
-                if (exchanges == null)
-                {
-                    return AuthzResult.ALLOWED;
-                }
-                // Otherwise exchange must be listed in the white list
-
-                // If the map doesn't have the exchange then it isn't allowed
-                if (!exchanges.containsKey(((Exchange) parameters[0]).getName()))
-                {
-                    return AuthzResult.DENIED;
-                }
-                else
-                {
-
-                    // Get valid routing keys
-                    HashSet routingKeys = (HashSet) exchanges.get(((Exchange)parameters[0]).getName());
-
-                    // Having no routingKeys in the map then all are allowed.
-                    if (routingKeys == null)
-                    {
-                        return AuthzResult.ALLOWED;
-                    }
-                    else
-                    {
-                        // We have routingKeys so a match must be found to allowed binding
-                        Iterator keys = routingKeys.iterator();
-
-
-                        AMQShortString publishRKey = (AMQShortString)parameters[1];
-
-                        boolean matched = false;
-                        while (keys.hasNext() && !matched)
-                        {
-                            AMQShortString rkey = (AMQShortString) keys.next();
-
-                            if (rkey.endsWith("*"))
-                            {
-                                matched = publishRKey.startsWith(rkey.subSequence(0, rkey.length() - 1));
-                            }
-                            else
-                            {
-                                matched = publishRKey.equals(rkey);
-                            }
-                        }
-                        return (matched) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
-                    }
-                }
+                return authorisePublish(permission, parameters);
+            /* Fall through */
+            case DELETE:
             case PURGE:
-                break;
             case UNBIND:
-                break;
-
+            default:
+                return AuthzResult.DENIED;
         }
 
-        return AuthzResult.DENIED;
     }
+
+	private AuthzResult authoriseConsume(Permission permission, Object... parameters) {
+		if (parameters.length == 1 && parameters[0] instanceof AMQQueue)
+		{
+		    AMQQueue queue = ((AMQQueue) parameters[0]);
+		    Map queuePermissions = (Map) _permissions.get(permission);
+		    
+		    if (queuePermissions == null)
+		    {
+		    	//if the outer map is null, the user has no CONSUME rights at all
+		    	return AuthzResult.DENIED;
+		    }
+
+		    List queues = (List) queuePermissions.get(CONSUME_QUEUES_KEY);
+
+		    Boolean temporayQueues = (Boolean) queuePermissions.get(CONSUME_TEMPORARY_KEY);
+		    Boolean ownQueuesOnly = (Boolean) queuePermissions.get(CONSUME_OWN_QUEUES_ONLY_KEY);
+
+		    // If user is allowed to publish to temporary queues and this is a temp queue then allow it.
+		    if (temporayQueues)
+		    {
+		        if (queue.isAutoDelete())
+		        // This will allow consumption from any temporary queue including ones not owned by this user.
+		        // Of course the exclusivity will not be broken.
+		        {
+		            // if not limited to ownQueuesOnly then ok else check queue Owner.
+		            return (!ownQueuesOnly || queue.getOwner().equals(_user)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+		        }
+		        else
+		        {
+		            return AuthzResult.DENIED;
+		        }
+		    }
+
+		    // if queues are white listed then ensure it is ok
+		    if (queues != null)
+		    {
+		        // if no queues are listed then ALL are ok othereise it must be specified.
+		        if (ownQueuesOnly)
+		        {
+		            if (queue.getOwner().equals(_user))
+		            {
+		                return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+		            }
+		            else
+		            {
+		                return AuthzResult.DENIED;
+		            }
+		        }
+
+		        // If we are
+		        return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+		    }
+		}
+
+		// Can't authenticate without the right parameters
+		return AuthzResult.DENIED;
+	}
+
+	private AuthzResult authorisePublish(Permission permission, Object... parameters) {
+		Map publishRights = (Map) _permissions.get(permission);
+
+		if (publishRights == null)
+		{
+		    return AuthzResult.DENIED;
+		}
+
+		Map exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY);
+
+		// Having no exchanges listed gives full publish rights to all exchanges
+		if (exchanges == null)
+		{
+		    return AuthzResult.ALLOWED;
+		}
+		// Otherwise exchange must be listed in the white list
+
+		// If the map doesn't have the exchange then it isn't allowed
+		if (!exchanges.containsKey(((Exchange) parameters[0]).getName()))
+		{
+		    return AuthzResult.DENIED;
+		}
+		else
+		{
+
+		    // Get valid routing keys
+		    HashSet routingKeys = (HashSet) exchanges.get(((Exchange)parameters[0]).getName());
+
+		    // Having no routingKeys in the map then all are allowed.
+		    if (routingKeys == null)
+		    {
+		        return AuthzResult.ALLOWED;
+		    }
+		    else
+		    {
+		        // We have routingKeys so a match must be found to allowed binding
+		        Iterator keys = routingKeys.iterator();
+
+
+		        AMQShortString publishRKey = (AMQShortString)parameters[1];
+
+		        boolean matched = false;
+		        while (keys.hasNext() && !matched)
+		        {
+		            AMQShortString rkey = (AMQShortString) keys.next();
+
+		            if (rkey.endsWith("*"))
+		            {
+		                matched = publishRKey.startsWith(rkey.subSequence(0, rkey.length() - 1));
+		            }
+		            else
+		            {
+		                matched = publishRKey.equals(rkey);
+		            }
+		        }
+		        return (matched) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+		    }
+		}
+	}
+
+	private AuthzResult authoriseCreateExchange(Permission permission, Object... parameters) {
+		Map rights = (Map) _permissions.get(permission);
+
+		AMQShortString exchangeName = (AMQShortString) parameters[0];
+
+		// If the exchange list is doesn't exist then all is allowed else
+		// check the valid exchanges
+		if (rights == null || rights.containsKey(exchangeName))
+		{
+		    return AuthzResult.ALLOWED; 
+		}
+		else
+		{
+		    return AuthzResult.DENIED;
+		}
+	}
+
+	private AuthzResult authoriseCreateQueue(Permission permission, Object... parameters) {
+		Map createRights = (Map) _permissions.get(permission);
+
+		// If there are no create rights then deny request
+		if (createRights == null)
+		{
+		    return AuthzResult.DENIED;
+		}
+
+		//Look up the Queue Creation Rights
+		Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY);
+
+		//Lookup the list of queues allowed to be created
+		Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY);
+
+
+		AMQShortString queueName = (AMQShortString) parameters[1];
+		Boolean autoDelete = (Boolean) parameters[0];
+
+		if (autoDelete)// we have a temporary queue
+		{
+		    return ((Boolean) create_queues.get(CREATE_QUEUE_TEMPORARY_KEY)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+		}
+		else
+		{
+		    // If there is a white list then check
+		    if (create_queues_queues == null || create_queues_queues.containsKey(queueName))
+		    {
+		        return AuthzResult.ALLOWED; 
+		    }
+		    else
+		    {
+		        return AuthzResult.DENIED;
+		    }
+		        
+		}
+	}
+
+	private AuthzResult authoriseBind(Object... parameters) {
+		Exchange exchange = (Exchange) parameters[1];
+
+		AMQQueue bind_queueName = (AMQQueue) parameters[2];
+		AMQShortString routingKey = (AMQShortString) parameters[3];
+
+		//Get all Create Rights for this user
+		Map bindCreateRights = (Map) _permissions.get(Permission.CREATEQUEUE);
+
+		//Look up the Queue Creation Rights
+		Map bind_create_queues = (Map) bindCreateRights.get(CREATE_QUEUES_KEY);
+
+		//Lookup the list of queues
+		Map bind_create_queues_queues = (Map) bindCreateRights.get(CREATE_QUEUE_QUEUES_KEY);
+
+		// Check and see if we have a queue white list to check
+		if (bind_create_queues_queues != null)
+		{
+		    //There a white list for queues
+		    Map exchangeDetails = (Map) bind_create_queues_queues.get(bind_queueName);
+
+		    if (exchangeDetails == null) //Then all queue can be bound to all exchanges.
+		    {
+		        return AuthzResult.ALLOWED;
+		    }
+
+		    // Check to see if we have a white list of routingkeys to check
+		    Map rkeys = (Map) exchangeDetails.get(exchange.getName());
+
+		    // if keys is null then any rkey is allowed on this exchange
+		    if (rkeys == null)
+		    {
+		        // There is no routingkey white list
+		        return AuthzResult.ALLOWED;
+		    }
+		    else
+		    {
+		        // We have routingKeys so a match must be found to allowed binding
+		        Iterator keys = rkeys.keySet().iterator();
+
+		        boolean matched = false;
+		        while (keys.hasNext() && !matched)
+		        {
+		            AMQShortString rkey = (AMQShortString) keys.next();
+		            if (rkey.endsWith("*"))
+		            {
+		                matched = routingKey.startsWith(rkey.subSequence(0, rkey.length() - 1).toString());
+		            }
+		            else
+		            {
+		                matched = routingKey.equals(rkey);
+		            }
+		        }
+
+
+		        return (matched) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+		    }
+
+
+		}
+		else
+		{
+		    //There a is no white list for queues
+
+		    // So can allow all queues to be bound
+		    //  but we should first check and see if we have a temp queue and validate that we are allowed
+		    //  to bind temp queues.
+
+		    //Check to see if we have a temporary queue
+		    if (bind_queueName.isAutoDelete())
+		    {
+		        // Check and see if we have an exchange white list.
+		        Map bind_exchanges = (Map) bind_create_queues.get(CREATE_QUEUE_EXCHANGES_KEY);
+
+		        // If the exchange exists then we must check to see if temporary queues are allowed here
+		        if (bind_exchanges != null)
+		        {
+		            // Check to see if the requested exchange is allowed.
+		            Map exchangeDetails = (Map) bind_exchanges.get(exchange.getName());
+
+		            return ((Boolean) exchangeDetails.get(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+		        }
+
+		        //no white list so all allowed, drop through to return true below.
+		    }
+
+		    // not a temporary queue and no white list so all allowed.
+		    return AuthzResult.ALLOWED;
+		}
+	}
 }

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java Sun Oct 11 23:22:08 2009
@@ -35,28 +35,24 @@
     // Returns true or false if the plugin should authorise or deny the request    
     protected abstract AuthzResult getResult();
     
-    @Override
     public AuthzResult authoriseBind(AMQProtocolSession session, Exchange exch,
             AMQQueue queue, AMQShortString routingKey)
     {
         return getResult();
     }
 
-    @Override
     public AuthzResult authoriseConnect(AMQProtocolSession session,
             VirtualHost virtualHost)
     {
         return getResult();
     }
 
-    @Override
     public AuthzResult authoriseConsume(AMQProtocolSession session, boolean noAck,
             AMQQueue queue)
     {
         return getResult();    
     }
 
-    @Override
     public AuthzResult authoriseConsume(AMQProtocolSession session,
             boolean exclusive, boolean noAck, boolean noLocal, boolean nowait,
             AMQQueue queue)
@@ -64,7 +60,6 @@
         return getResult();
     }
 
-    @Override
     public AuthzResult authoriseCreateExchange(AMQProtocolSession session,
             boolean autoDelete, boolean durable, AMQShortString exchangeName,
             boolean internal, boolean nowait, boolean passive,
@@ -73,7 +68,6 @@
         return getResult();
     }
 
-    @Override
     public AuthzResult authoriseCreateQueue(AMQProtocolSession session,
             boolean autoDelete, boolean durable, boolean exclusive,
             boolean nowait, boolean passive, AMQShortString queue)
@@ -81,19 +75,16 @@
         return getResult();
     }
 
-    @Override
     public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue)
     {
         return getResult();
     }
 
-    @Override
     public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange)
     {
         return getResult();
     }
 
-    @Override
     public AuthzResult authorisePublish(AMQProtocolSession session,
             boolean immediate, boolean mandatory, AMQShortString routingKey,
             Exchange e)
@@ -101,20 +92,17 @@
         return getResult();
     }
 
-    @Override
     public AuthzResult authorisePurge(AMQProtocolSession session, AMQQueue queue)
     {
         return getResult();
     }
 
-    @Override
     public AuthzResult authoriseUnbind(AMQProtocolSession session, Exchange exch,
             AMQShortString routingKey, AMQQueue queue)
     {
         return getResult();
     }
 
-    @Override
     public void setConfiguration(Configuration config)
     {
         // no-op

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java Sun Oct 11 23:22:08 2009
@@ -211,7 +211,6 @@
 
     }
 
-    @Override
     public void setConfiguration(Configuration config) throws ConfigurationException
     {
         // Get default action

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabaseManager.java Sun Oct 11 23:22:08 2009
@@ -42,7 +42,6 @@
         return _databases;
     }
 
-    @Override
     public void initialiseManagement(ServerConfiguration _configuration) throws ConfigurationException
     {
         //todo

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Sun Oct 11 23:22:08 2009
@@ -630,11 +630,19 @@
 
     public QueueEntry getLastSeenEntry()
     {
-        return _queueContext.get();
+        QueueEntry entry = _queueContext.get();
+
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug(_logActor + ": lastSeenEntry: " + (entry == null ? "null" : entry.debugIdentity()));
+        }        
+
+        return entry;
     }
 
     public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue)
     {
+        _logger.debug(debugIdentity() + " Setting Last Seen To:" + (newvalue == null ? "nullNV" : newvalue.debugIdentity()));
         return _queueContext.compareAndSet(expected,newvalue);
     }
 

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Sun Oct 11 23:22:08 2009
@@ -97,6 +97,7 @@
             try
             {
                 QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
+                _queue.checkCapacity(_channel);
 
                 if(entry.immediateAndNotDelivered())
                 {

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Sun Oct 11 23:22:08 2009
@@ -91,6 +91,8 @@
     public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
     {
         QueueEntry entry = queue.enqueue(_storeContext, message);
+        queue.checkCapacity(_channel);
+
         
         //following check implements the functionality
         //required by the 'immediate' flag:

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Sun Oct 11 23:22:08 2009
@@ -31,6 +31,7 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.AMQException;
 import org.apache.commons.configuration.Configuration;
 
@@ -271,6 +272,15 @@
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    public boolean getBlockOnQueueFull()
+    {
+        return false;
+    }
+
+    public void setBlockOnQueueFull(boolean block)
+    {
+    }
+
     public long getMinimumAlertRepeatGap()
     {
         return 0;  //To change body of implemented methods use File | Settings | File Templates.
@@ -285,8 +295,8 @@
     {
         return 0;  //To change body of implemented methods use File | Settings | File Templates.
     }
+   
 
-    @Override
     public void checkMessageStatus() throws AMQException
     {
         //To change body of implemented methods use File | Settings | File Templates.
@@ -317,6 +327,10 @@
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    public void checkCapacity(AMQChannel channel)
+    {               
+    }
+
     public ManagedObject getManagedObject()
     {
         return null;  //To change body of implemented methods use File | Settings | File Templates.
@@ -327,12 +341,31 @@
         return 0;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    @Override
     public void setMinimumAlertRepeatGap(long value)
     {
         
     }
 
+    public long getCapacity()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void setCapacity(long capacity)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public long getFlowResumeCapacity()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void setFlowResumeCapacity(long flowResumeCapacity)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public void configure(QueueConfiguration config)
     {
         

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java Sun Oct 11 23:22:08 2009
@@ -22,7 +22,6 @@
 
 import junit.framework.TestCase;
 import org.apache.qpid.server.util.TestApplicationRegistry;
-import org.apache.qpid.AMQException;
 
 import java.security.Security;
 import java.security.Provider;
@@ -69,7 +68,7 @@
         // Register new providers
         try
         {
-            _registry.initialise();
+            _registry.initialise(ApplicationRegistry.DEFAULT_INSTANCE);
         }
         catch (Exception e)
         {

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java Sun Oct 11 23:22:08 2009
@@ -27,6 +27,7 @@
 import org.apache.qpid.server.logging.RootMessageLoggerImpl;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.logging.actors.BrokerActor;
 import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
 import org.apache.qpid.server.management.NoopManagedObjectRegistry;
 import org.apache.qpid.server.plugins.PluginManager;
@@ -50,7 +51,7 @@
         super(new ServerConfiguration(new PropertiesConfiguration()));
     }
 
-    public void initialise() throws Exception
+    public void initialise(int instanceID) throws Exception
     {
         _logger.info("Initialising NullApplicationRegistry");
 
@@ -92,6 +93,8 @@
     @Override
     public void close() throws Exception
     {
+        CurrentActor.set(new BrokerActor(_rootMessageLogger));
+
         try
         {
             super.close();                                                  

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java Sun Oct 11 23:22:08 2009
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.util;
 
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.MapConfiguration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.qpid.server.configuration.ServerConfiguration;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -31,7 +30,6 @@
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.access.ACLManager;
-import org.apache.qpid.server.security.access.ACLPlugin;
 import org.apache.qpid.server.security.access.plugins.AllowAll;
 import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
@@ -45,7 +43,6 @@
 import org.apache.qpid.server.logging.rawloggers.Log4jMessageLogger;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Properties;
 import java.util.Arrays;
 
@@ -75,7 +72,7 @@
     	_config = config;
     }
 
-    public void initialise() throws Exception
+    public void initialise(int instanceID) throws Exception
     {
         _rootMessageLogger = new RootMessageLoggerImpl(_configuration,
                                                        new Log4jMessageLogger());

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Sun Oct 11 23:22:08 2009
@@ -313,7 +313,7 @@
     protected AMQConnectionDelegate _delegate;
 
     // this connection maximum number of prefetched messages
-    protected int _maxPrefetch;
+    private int _maxPrefetch;
 
     //Indicates whether persistent messages are synchronized
     private boolean _syncPersistence;
@@ -450,7 +450,7 @@
         }
         else
         {
-            // use the defaul value set for all connections
+            // use the default value set for all connections
             _syncPublish = System.getProperty((ClientProperties.SYNC_ACK_PROP_NAME),_syncPublish);
         }
         
@@ -512,7 +512,7 @@
 
         boolean retryAllowed = true;
         Exception connectionException = null;
-        while (!_connected && retryAllowed)
+        while (!_connected && retryAllowed && brokerDetails != null)
         {
             ProtocolVersion pe = null;
             try
@@ -691,12 +691,12 @@
 
     public boolean attemptReconnection()
     {
-        while (_failoverPolicy.failoverAllowed())
+        BrokerDetails broker = null;
+        while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null)
         {
             try
             {
-                makeBrokerConnection(_failoverPolicy.getNextBrokerDetails());
-
+                makeBrokerConnection(broker);
                 return true;
             }
             catch (Exception e)

Modified: qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=824198&r1=824197&r2=824198&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Sun Oct 11 23:22:08 2009
@@ -39,6 +39,15 @@
     Session createSession(final boolean transacted, final int acknowledgeMode,
      final int prefetchHigh, final int prefetchLow) throws JMSException;
 
+    /**
+     * Create an XASession with default prefetch values of:
+     * High = MaxPrefetch
+     * Low  = MaxPrefetch / 2
+     * @return XASession
+     * @throws JMSException thrown if there is a problem creating the session.
+     */
+    XASession createXASession() throws JMSException;
+
     XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException;
 
     void failoverPrep();



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org