You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/20 18:23:11 UTC

svn commit: r827724 [4/8] - in /qpid/branches/java-broker-0-10/qpid/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/src/main/java/org/apach...

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Oct 20 16:23:01 2009
@@ -19,13 +19,11 @@
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.TransactionLog;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.QueueActor;
 import org.apache.qpid.server.logging.subjects.QueueLogSubject;
@@ -33,7 +31,7 @@
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.messages.QueueMessages;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 
@@ -61,62 +59,40 @@
 {
     private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
 
-    private int unused;
-
-    private PrincipalHolder _prinicpalHolder;
-
-
-    private Object _exclusiveOwner;
-
-    private Exchange _alternateExchange;
-
-
-    static final class QueueContext implements Context
-    {
-        volatile QueueEntry _lastSeenEntry;
-        volatile QueueEntry _releasedEntry;
-
-        public QueueContext(QueueEntry head)
-        {
-            _lastSeenEntry = head;
-        }
-
-        public QueueEntry getLastSeenEntry()
-        {
-            return _lastSeenEntry;
-        }
-    }
-
-
-    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
-            _lastSeenUpdater =
-        AtomicReferenceFieldUpdater.newUpdater
-        (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
-
-    static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
-            _releasedUpdater =
-        AtomicReferenceFieldUpdater.newUpdater
-        (QueueContext.class, QueueEntry.class, "_releasedEntry");
 
+    private final VirtualHost _virtualHost;
 
     private final AMQShortString _name;
+    private final String _resourceName;
 
     /** null means shared */
     private final AMQShortString _owner;
 
+    private PrincipalHolder _prinicpalHolder;
+
+    private Object _exclusiveOwner;
+
+
     private final boolean _durable;
 
     /** If true, this queue is deleted when the last subscriber is removed */
     private final boolean _autoDelete;
 
-    private final VirtualHost _virtualHost;
+    private Exchange _alternateExchange;
 
     /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
     private final ExchangeBindings _bindings = new ExchangeBindings(this);
 
-    private final AtomicBoolean _deleted = new AtomicBoolean(false);
 
-    private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+    protected final QueueEntryList _entries;
+
+    protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
+
+    private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
+
+    private volatile Subscription _exclusiveSubscriber;
+
+
 
     private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
 
@@ -124,18 +100,10 @@
 
     private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
 
-    protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
-    private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
+    private final AtomicLong _totalMessagesReceived = new AtomicLong();
 
-    private volatile Subscription _exclusiveSubscriber;
 
-    protected final QueueEntryList _entries;
 
-    private final AMQQueueMBean _managedObject;
-    private final Executor _asyncDelivery;
-    private final AtomicLong _totalMessagesReceived = new AtomicLong();
-
-    private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
 
     /** max allowed size(KB) of a single message */
     public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
@@ -152,23 +120,37 @@
     /** the minimum interval between sending out consecutive alerts of the same type */
     public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap();
 
-    private static final int MAX_ASYNC_DELIVERIES = 10;
+    private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
+
+    private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
 
     private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
 
+
+    static final int MAX_ASYNC_DELIVERIES = 10;
+
+
     private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
     private AtomicReference _asynchronousRunner = new AtomicReference(null);
+    private final Executor _asyncDelivery;
     private AtomicInteger _deliveredMessages = new AtomicInteger();
     private AtomicBoolean _stopped = new AtomicBoolean(false);
+
+    private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
+
+    private final AtomicBoolean _deleted = new AtomicBoolean(false);
+    private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+
+
     private LogSubject _logSubject;
     private LogActor _logActor;
 
+    private AMQQueueMBean _managedObject;
+    private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER";
+    private boolean _nolocal;
 
-    private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
-    private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
 
     protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
-            throws AMQException
     {
         this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory());
     }
@@ -179,7 +161,6 @@
                              boolean autoDelete,
                              VirtualHost virtualHost,
                              QueueEntryListFactory entryListFactory)
-            throws AMQException
     {
 
         if (name == null)
@@ -193,6 +174,7 @@
         }
 
         _name = name;
+        _resourceName = String.valueOf(name);
         _durable = durable;
         _owner = owner;
         _autoDelete = autoDelete;
@@ -231,7 +213,7 @@
         }
         catch (JMException e)
         {
-            throw new AMQException("AMQQueue MBean creation has failed ", e);
+            _logger.error("AMQQueue MBean creation has failed ", e);
         }
 
         resetNotifications();
@@ -255,11 +237,21 @@
 
     // ------ Getters and Setters
 
+    public void execute(ReadWriteRunnable runnable)
+    {
+        _asyncDelivery.execute(runnable);
+    }
+
     public AMQShortString getName()
     {
         return _name;
     }
 
+    public void setNoLocal(boolean nolocal)
+    {
+        _nolocal = nolocal;
+    }
+
     public boolean isDurable()
     {
         return _durable;
@@ -401,6 +393,7 @@
         if (!isDeleted())
         {
             subscription.setQueue(this, exclusive);
+            subscription.setNoLocal(_nolocal);
             _subscriptionList.add(subscription);
             if (isDeleted())
             {
@@ -540,7 +533,10 @@
             deliverAsync();
         }
 
-        _managedObject.checkForNotification(entry.getMessage());
+        if(_managedObject != null)
+        {
+            _managedObject.checkForNotification(entry.getMessage());
+        }
 
         return entry;
     }
@@ -612,10 +608,10 @@
         QueueContext subContext = (QueueContext) sub.getQueueContext();
         QueueEntry releasedEntry = subContext._releasedEntry;
 
-        _lastSeenUpdater.set(subContext, entry);
+        QueueContext._lastSeenUpdater.set(subContext, entry);
         if(releasedEntry == entry)
         {
-           _releasedUpdater.compareAndSet(subContext, releasedEntry, null);
+           QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
         }
     }
 
@@ -629,7 +625,7 @@
 
             while((oldEntry  = subContext._releasedEntry) == null || oldEntry.compareTo(entry) > 0)
             {
-                if(_releasedUpdater.compareAndSet(subContext, oldEntry, entry))
+                if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry))
                 {
                     break;
                 }
@@ -939,11 +935,11 @@
     public void moveMessagesToAnotherQueue(final long fromMessageId,
                                            final long toMessageId,
                                            String queueName,
-                                           StoreContext storeContext)
+                                           ServerTransaction txn)
     {
 
-        AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
-        TransactionLog txnLog = getVirtualHost().getTransactionLog();
+        final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+
 
         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
         {
@@ -962,62 +958,48 @@
             }
         });
 
-        try
+
+
+        // Move the messages in on the message store.
+        for (final QueueEntry entry : entries)
         {
-            txnLog.beginTran(storeContext);
+            final ServerMessage message = entry.getMessage();
+            txn.enqueue(toQueue, message,
+                        new ServerTransaction.Action()
+                        {
 
-            // Move the messages in on the message store.
-            for (QueueEntry entry : entries)
-            {
-                ServerMessage message = entry.getMessage();
+                            public void postCommit()
+                            {
+                                try
+                                {
+                                    toQueue.enqueue(message);
+                                }
+                                catch (AMQException e)
+                                {
+                                    throw new RuntimeException(e);
+                                }
+                            }
 
-                if (message.isPersistent() && toQueue.isDurable())
-                {
-                    txnLog.enqueueMessage(storeContext, toQueue, message.getMessageNumber());
-                }
-                // dequeue does not decrement the refence count
-                entry.dequeue();
-            }
+                            public void onRollback()
+                            {
+                                entry.release();
+                            }
+                        });
+            txn.dequeue(this, message,
+                        new ServerTransaction.Action()
+                        {
 
-            // Commit and flush the move transcations.
-            try
-            {
-                txnLog.commitTran(storeContext);
-            }
-            catch (AMQException e)
-            {
-                throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
-            }
-        }
-        catch (AMQException e)
-        {
-            try
-            {
-                txnLog.abortTran(storeContext);
-            }
-            catch (AMQException rollbackEx)
-            {
-                _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
-            }
-            throw new RuntimeException(e);
-        }
+                            public void postCommit()
+                            {
+                                entry.discard();
+                            }
 
-        try
-        {
+                            public void onRollback()
+                            {
+
+                            }
+                        });
 
-            for (QueueEntry entry : entries)
-            {
-                toQueue.enqueue(entry.getMessage());
-                entry.delete();
-            }
-        }
-        catch (MessageCleanupException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (AMQException e)
-        {
-            throw new RuntimeException(e);
         }
 
     }
@@ -1025,10 +1007,9 @@
     public void copyMessagesToAnotherQueue(final long fromMessageId,
                                            final long toMessageId,
                                            String queueName,
-                                           final StoreContext storeContext)
+                                           final ServerTransaction txn)
     {
-        AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
-        TransactionLog txnLog = getVirtualHost().getTransactionLog();
+        final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
 
         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
         {
@@ -1046,66 +1027,37 @@
             }
         });
 
-        try
+
+        // Move the messages in on the message store.
+        for (QueueEntry entry : entries)
         {
-            txnLog.beginTran(storeContext);
+            final ServerMessage message = entry.getMessage();
 
-            // Move the messages in on the message store.
-            for (QueueEntry entry : entries)
+            if (message.isPersistent() && toQueue.isDurable())
             {
-                ServerMessage message = entry.getMessage();
 
-                if (message.isPersistent() && toQueue.isDurable())
-                {
-
-                    txnLog.enqueueMessage(storeContext, toQueue, message.getMessageNumber());
-
-                }
-            }
+                txn.enqueue(toQueue, message, new ServerTransaction.Action()
+                    {
+                        public void postCommit()
+                        {
+                            try
+                            {
+                                toQueue.enqueue(message);
+                            }
+                            catch (AMQException e)
+                            {
+                                throw new RuntimeException(e);
+                            }
+                        }
 
-            // Commit and flush the move transcations.
-            try
-            {
-                txnLog.commitTran(storeContext);
-            }
-            catch (AMQException e)
-            {
-                throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
-            }
-        }
-        catch (AMQException e)
-        {
-            try
-            {
-                txnLog.abortTran(storeContext);
-            }
-            catch (AMQException rollbackEx)
-            {
-                _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
-            }
-            throw new RuntimeException(e);
-        }
+                        public void onRollback()
+                        {
 
-        try
-        {
-            for (QueueEntry entry : entries)
-            {
+                        }
+                    });
 
-                ServerMessage message = entry.getMessage();
-                if (message != null)
-                {
-                    toQueue.enqueue(entry.getMessage());
-                }
             }
         }
-        catch (MessageCleanupException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (AMQException e)
-        {
-            throw new RuntimeException(e);
-        }
 
     }
 
@@ -1160,7 +1112,7 @@
         QueueEntryIterator queueListIterator = _entries.iterator();
         long count = 0;
 
-        Transaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
+        ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
 
         while (queueListIterator.advance())
         {
@@ -1181,14 +1133,14 @@
 
     private void dequeueEntry(final QueueEntry node)
     {
-        Transaction txn = new AutoCommitTransaction(getVirtualHost().getTransactionLog());
+        ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getTransactionLog());
         dequeueEntry(node, txn);
     }
 
-    private void dequeueEntry(final QueueEntry node, Transaction txn)
+    private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
     {
         txn.dequeue(this, node.getMessage(),
-                    new Transaction.Action()
+                    new ServerTransaction.Action()
                     {
 
                         public void postCommit()
@@ -1241,7 +1193,7 @@
                 }
             });
 
-            Transaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
+            ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
 
             if(_alternateExchange != null)
             {
@@ -1255,7 +1207,7 @@
                     if(rerouteQueues != null & rerouteQueues.size() != 0)
                     {
                         txn.enqueue(rerouteQueues, entry.getMessage(),
-                                    new Transaction.Action()
+                                    new ServerTransaction.Action()
                                     {
 
                                         public void postCommit()
@@ -1280,7 +1232,7 @@
                                         }
                                     });
                         txn.dequeue(this, entry.getMessage(),
-                                    new Transaction.Action()
+                                    new ServerTransaction.Action()
                                     {
 
                                         public void postCommit()
@@ -1308,7 +1260,7 @@
                     if(message != null)
                     {
                         txn.dequeue(this, message,
-                                    new Transaction.Action()
+                                    new ServerTransaction.Action()
                                     {
 
                                         public void postCommit()
@@ -1327,7 +1279,10 @@
             txn.commit();
 
 
-            _managedObject.unregister();
+            if(_managedObject!=null)
+            {
+                _managedObject.unregister();
+            }
 
             for (Task task : _deleteTaskList)
             {
@@ -1417,7 +1372,13 @@
 
     public void deliverAsync(Subscription sub)
     {
-        _asyncDelivery.execute(new SubFlushRunner(sub));
+        SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER);
+        if(flusher == null)
+        {
+            flusher = new SubFlushRunner(sub);
+            sub.set(SUB_FLUSH_RUNNER, flusher);
+        }
+        _asyncDelivery.execute(flusher);
     }
 
 
@@ -1466,66 +1427,12 @@
         }
     }
 
-    private class SubFlushRunner implements ReadWriteRunnable
-    {
-        private final Subscription _sub;
-
-        public SubFlushRunner(Subscription sub)
-        {
-            _sub = sub;
-        }
-
-        public void run()
-        {
-
-            String originalName = Thread.currentThread().getName();
-            try{
-                Thread.currentThread().setName("SubFlushRunner-"+_sub);
-
-                boolean complete = false;
-                try
-                {
-                    CurrentActor.set(_sub.getLogActor());
-                    complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
-
-                }
-                catch (AMQException e)
-                {
-                    _logger.error(e);
-                }
-                finally
-                {
-                    CurrentActor.remove();
-                }
-                if (!complete && !_sub.isSuspended())
-                {
-                    _asyncDelivery.execute(this);
-                }
-            }
-            finally
-            {
-                Thread.currentThread().setName(originalName);
-            }
-
-        }
-
-        public boolean isRead()
-        {
-            return false;
-        }
-
-        public boolean isWrite()
-        {
-            return true;
-        }
-    }
-
     public void flushSubscription(Subscription sub) throws AMQException
     {
         flushSubscription(sub, Long.MAX_VALUE);
     }
 
-    public boolean flushSubscription(Subscription sub, Long iterations) throws AMQException
+    public boolean flushSubscription(Subscription sub, long iterations) throws AMQException
     {
         boolean atTail = false;
 
@@ -1655,9 +1562,9 @@
                     }
                 }
 
-                if(_lastSeenUpdater.compareAndSet(context, lastSeen, node))
+                if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node))
                 {
-                    _releasedUpdater.compareAndSet(context, releasedNode, null);
+                    QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null);
                 }
 
                 lastSeen = context._lastSeenEntry;
@@ -1774,7 +1681,10 @@
             }
             else
             {
-                _managedObject.checkForNotification(node.getMessage());
+                if(_managedObject!=null)
+                {
+                    _managedObject.checkForNotification(node.getMessage());
+                }
             }
         }
 
@@ -1969,4 +1879,9 @@
             _flowResumeCapacity = config.getFlowResumeCapacity();
         }
     }
+
+    public String getResourceName()
+    {
+        return _resourceName;
+    }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Tue Oct 20 16:23:01 2009
@@ -4,6 +4,7 @@
 import org.apache.qpid.server.message.ServerMessage;
 
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
 
 /*
 *
@@ -44,9 +45,7 @@
                 _nextUpdater =
             AtomicReferenceFieldUpdater.newUpdater
             (QueueEntryImpl.class, QueueEntryImpl.class, "_next");
-
-
-
+    private AtomicLong _deletes = new AtomicLong(0L);
 
 
     public SimpleQueueEntryList(AMQQueue queue)
@@ -56,21 +55,77 @@
         _tail = _head;
     }
 
+    
+
     void advanceHead()
     {
+        _deletes.incrementAndGet();
         QueueEntryImpl head = _head.nextNode();
+        boolean deleted = head.isDeleted();
         while(head._next != null && head.isDeleted())
         {
 
+            deleted = true;
             final QueueEntryImpl newhead = head.nextNode();
             if(newhead != null)
             {
-                _nextUpdater.compareAndSet(_head,head, newhead);
+                if(_nextUpdater.compareAndSet(_head,head, newhead))
+                {
+                    _deletes.decrementAndGet();
+                }
             }
             head = _head.nextNode();
         }
+
+        if(!deleted)
+        {
+            deleted = true;
+        }
+
+        if(_deletes.get() > 1000L)
+        {
+            _deletes.set(0L);
+            scavenge();
+        }
     }
 
+    void scavenge()
+    {
+        QueueEntryImpl root = _head;
+        QueueEntryImpl next = root.nextNode();
+
+        do
+        {
+
+
+            while(next._next != null && next.isDeleted())
+            {
+
+                final QueueEntryImpl newhead = next.nextNode();
+                if(newhead != null)
+                {
+                    _nextUpdater.compareAndSet(root,next, newhead);
+                }
+                next = root.nextNode();
+            }
+            if(next._next != null)
+            {
+                if(!next.isDeleted())
+                {
+                    root = next;
+                    next = root.nextNode();
+                }
+            }
+            else
+            {
+                break;
+            }
+
+        } while (next != null && next._next != null);
+
+    }
+
+
 
     public AMQQueue getQueue()
     {

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,68 @@
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.pool.ReadWriteRunnable;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+
+class SubFlushRunner implements ReadWriteRunnable
+{
+    private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
+
+
+    private final Subscription _sub;
+    private final String _name;
+    private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
+
+    public SubFlushRunner(Subscription sub)
+    {
+        _sub = sub;
+        _name = "SubFlushRunner-"+_sub;
+    }
+
+    public void run()
+    {
+
+       
+        Thread.currentThread().setName(_name);
+
+        boolean complete = false;
+        try
+        {
+            CurrentActor.set(_sub.getLogActor());
+            complete = getQueue().flushSubscription(_sub, ITERATIONS);
+
+        }
+        catch (AMQException e)
+        {
+            _logger.error(e);
+        }
+        finally
+        {
+            CurrentActor.remove();
+        }
+        if (!complete && !_sub.isSuspended())
+        {
+            getQueue().execute(this);
+        }
+
+
+    }
+
+    private SimpleAMQQueue getQueue()
+    {
+        return (SimpleAMQQueue) _sub.getQueue();
+    }
+
+    public boolean isRead()
+    {
+        return false;
+    }
+
+    public boolean isWrite()
+    {
+        return true;
+    }
+}

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Tue Oct 20 16:23:01 2009
@@ -35,8 +35,8 @@
 import org.apache.qpid.server.security.access.ACLManager;
 import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager;
 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 import java.io.File;
 
@@ -51,15 +51,15 @@
 
     public void initialise(int instanceID) throws Exception
     {
-        _rootMessageLogger = new RootMessageLoggerImpl(_configuration, 
+        _rootMessageLogger = new RootMessageLoggerImpl(_configuration,
                                                        new Log4jMessageLogger());
-        
+
         _registryName = String.valueOf(instanceID);
 
         // Set the Actor for current log messages
         CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger));
 
-        CurrentActor.get().message(BrokerMessages.BRK_1001(QpidProperties.getReleaseVersion(),QpidProperties.getBuildVersion()));        
+        CurrentActor.get().message(BrokerMessages.BRK_1001(QpidProperties.getReleaseVersion(),QpidProperties.getBuildVersion()));
 
         initialiseManagedObjectRegistry();
 
@@ -68,7 +68,7 @@
         _pluginManager = new PluginManager(_configuration.getPluginDirectory());
 
         _accessManager = new ACLManager(_configuration.getSecurityConfiguration(), _pluginManager);
-        
+
         _databaseManager = new ConfigurationFilePrincipalDatabaseManager(_configuration);
 
         _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
@@ -99,10 +99,10 @@
     }
 
     private void initialiseVirtualHosts() throws Exception
-    {        
+    {
         for (String name : _configuration.getVirtualHosts())
         {
-            _virtualHostRegistry.registerVirtualHost(new VirtualHost(_configuration.getVirtualHostConfig(name)));
+            _virtualHostRegistry.registerVirtualHost(new VirtualHostImpl(_configuration.getVirtualHostConfig(name)));
         }
         getVirtualHostRegistry().setDefaultVirtualHostName(_configuration.getDefaultVirtualHost());
     }

Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java (from r821930, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java&r1=821930&r2=827724&rev=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java Tue Oct 20 16:23:01 2009
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.server;
+package org.apache.qpid.server.security;
 
 import java.security.Principal;
 

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java Tue Oct 20 16:23:01 2009
@@ -14,9 +14,9 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.server.security.access;
 
@@ -32,16 +32,12 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.configuration.SecurityConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.plugins.PluginManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
-import org.apache.qpid.server.security.access.plugins.SimpleXML;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.PrincipalHolder;
 
 public class ACLManager
 {
@@ -79,7 +75,7 @@
     {
         _hostPlugins = configurePlugins(hostConfig);
     }
-    
+
     public Map<String, ACLPlugin> configurePlugins(SecurityConfiguration hostConfig) throws ConfigurationException
     {
         Configuration securityConfig = hostConfig.getConfiguration();
@@ -109,7 +105,7 @@
             }
         }
         return plugins;
-    }    
+    }
 
     public static Logger getLogger()
     {
@@ -132,18 +128,18 @@
             if (result == AuthzResult.DENIED)
             {
                 // Something vetoed the access, we're done
-                return false; 
+                return false;
             }
             else if (result == AuthzResult.ALLOWED)
             {
-                // Remove plugin from global check list since 
+                // Remove plugin from global check list since
                 // host allow overrides global allow
                 remainingPlugins.remove(plugin.getKey());
             }
         }
-        
+
         for (ACLPlugin plugin : remainingPlugins.values())
-        {   
+        {
             result = checker.allowed(plugin);
             if (result == AuthzResult.DENIED)
             {
@@ -271,7 +267,7 @@
 
         });
     }
-    
+
     public boolean authorisePublish(final PrincipalHolder session, final boolean immediate, final boolean mandatory,
             final AMQShortString routingKey, final Exchange e)
     {

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java Tue Oct 20 16:23:01 2009
@@ -24,10 +24,9 @@
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
 
 public interface ACLPlugin
 {
@@ -35,13 +34,13 @@
     {
         ALLOWED,
         DENIED,
-        ABSTAIN        
+        ABSTAIN
     }
 
     void setConfiguration(Configuration config) throws ConfigurationException;
 
-    // These return true if the plugin thinks the action should be allowed, and false if not. 
-    
+    // These return true if the plugin thinks the action should be allowed, and false if not.
+
     AuthzResult authoriseBind(PrincipalHolder session, Exchange exch, AMQQueue queue, AMQShortString routingKey);
 
     AuthzResult authoriseCreateExchange(PrincipalHolder session, boolean autoDelete, boolean durable,

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java Tue Oct 20 16:23:01 2009
@@ -22,14 +22,13 @@
 
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.access.ACLPlugin;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
 
 /**
- * This ACLPlugin abstains from all votes. Useful if your plugin only cares about a few operations. 
+ * This ACLPlugin abstains from all votes. Useful if your plugin only cares about a few operations.
  */
 public abstract class AbstractACLPlugin implements ACLPlugin
 {

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java Tue Oct 20 16:23:01 2009
@@ -21,21 +21,19 @@
 package org.apache.qpid.server.security.access.plugins;
 
 import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.PrincipalHolder;
 
 public abstract class BasicACLPlugin implements ACLPlugin
 {
 
-    // Returns true or false if the plugin should authorise or deny the request    
+    // Returns true or false if the plugin should authorise or deny the request
     protected abstract AuthzResult getResult();
-    
+
     public AuthzResult authoriseBind(PrincipalHolder session, Exchange exch,
             AMQQueue queue, AMQShortString routingKey)
     {
@@ -51,7 +49,7 @@
     public AuthzResult authoriseConsume(PrincipalHolder session, boolean noAck,
             AMQQueue queue)
     {
-        return getResult();    
+        return getResult();
     }
 
     public AuthzResult authoriseConsume(PrincipalHolder session,

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java Tue Oct 20 16:23:01 2009
@@ -25,15 +25,14 @@
 import org.apache.qpid.framing.AMQShortString;
 
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.access.ACLPlugin;
 import org.apache.qpid.server.security.access.ACLPluginFactory;
 import org.apache.qpid.server.security.access.AccessResult;
 import org.apache.qpid.server.security.access.Permission;
 import org.apache.qpid.server.security.access.PrincipalPermissions;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.PrincipalHolder;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -57,7 +56,7 @@
             return plugin;
         }
     };
-    
+
     private Map<String, PrincipalPermissions> _users;
     private final AccessResult GRANTED = new AccessResult(this, AccessResult.AccessStatus.GRANTED);
 
@@ -82,7 +81,7 @@
 
     /**
      * Publish format takes Exchange + Routing Key Pairs
-     * 
+     *
      * @param config
      *            XML Configuration
      */

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java Tue Oct 20 16:23:01 2009
@@ -32,12 +32,11 @@
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.security.access.ACLPlugin;
 import org.apache.qpid.server.security.access.ACLPluginFactory;
 import org.apache.qpid.server.security.access.plugins.AbstractACLPlugin;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.PrincipalHolder;
 import org.apache.qpid.util.NetMatcher;
 
 public class FirewallPlugin extends AbstractACLPlugin
@@ -59,7 +58,7 @@
             return plugin;
         }
     };
-    
+
     public class FirewallRule
     {
 
@@ -71,13 +70,13 @@
         public FirewallRule(String access, List networks, List hostnames)
         {
             _access = (access.equals("allow")) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
-            
+
             if (networks != null && networks.size() > 0)
             {
                 String[] networkStrings = objListToStringArray(networks);
                 _network = new NetMatcher(networkStrings);
             }
-            
+
             if (hostnames != null && hostnames.size() > 0)
             {
                 int i = 0;
@@ -87,7 +86,7 @@
                     _hostnamePatterns[i++] = Pattern.compile(hostname);
                 }
             }
-            
+
         }
 
         private String[] objListToStringArray(List objList)
@@ -149,7 +148,7 @@
 
             thread.run();
             long endTime = System.currentTimeMillis() + DNS_TIMEOUT;
-            
+
             while (System.currentTimeMillis() < endTime && !done.get())
             {
                 try
@@ -183,7 +182,7 @@
         if(!(principalHolder instanceof ProtocolEngine))
         {
             return AuthzResult.ABSTAIN; // We only deal with tcp sessions
-        } 
+        }
 
         ProtocolEngine session = (ProtocolEngine) principalHolder;
 
@@ -237,7 +236,7 @@
             _default = AuthzResult.DENIED;
         }
         CompositeConfiguration finalConfig = new CompositeConfiguration(config);
-        
+
         List subFiles = config.getList("xml[@fileName]");
         for (Object subFile : subFiles)
         {
@@ -245,7 +244,7 @@
         }
 
         // all rules must have an access attribute
-        int numRules = finalConfig.getList("rule[@access]").size(); 
+        int numRules = finalConfig.getList("rule[@access]").size();
         _rules = new FirewallRule[numRules];
         for (int i = 0; i < numRules; i++)
         {

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java Tue Oct 20 16:23:01 2009
@@ -215,14 +215,7 @@
                 _logger.warn("Unable to load access file:" + jmxaccesssFile);
             }
 
-            try
-            {
-                _mbean.register();
-            }
-            catch (AMQException e)
-            {
-                _logger.warn("Unable to register user management MBean");
-            }
+            _mbean.register();
         }
         catch (JMException e)
         {

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java Tue Oct 20 16:23:01 2009
@@ -31,12 +31,12 @@
 {
     protected LogSubject _logSubject;
 
-    public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration hostConfig) throws Exception
+    public void configure(VirtualHost virtualHost) throws Exception
     {
         _logSubject = new MessageStoreLogSubject(virtualHost, this);
         CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1001(this.getClass().getName()));
     }
-    
+
     public void close() throws Exception
     {
         CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1003());

Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,57 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+import java.nio.ByteBuffer;
+import org.apache.qpid.framing.FieldTable;
+
+public interface ConfigurationRecoveryHandler
+{
+    QueueRecoveryHandler begin(MessageStore store);
+
+    public static interface QueueRecoveryHandler
+    {
+        void queue(String queueName, String owner, FieldTable arguments);
+        ExchangeRecoveryHandler completeQueueRecovery();
+    }
+
+    public static interface ExchangeRecoveryHandler
+    {
+        void exchange(String exchangeName, String type, boolean autoDelete);
+        BindingRecoveryHandler completeExchangeRecovery();
+    }
+
+    public static interface BindingRecoveryHandler
+    {
+        void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf);
+        void completeBindingRecovery();
+    }
+
+    public static interface QueueEntryRecoveryHandler
+    {
+        void complete();
+
+        void queueEntry(String queueName, long messageId);
+    }
+
+
+
+}

Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
------------------------------------------------------------------------------
    svn:executable = *



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org