You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/01 16:42:53 UTC

svn commit: r1295627 [6/12] - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ bdbstore/ bdbstore/src/main/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/ bdbsto...

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Thu Mar  1 15:42:44 2012
@@ -1,12 +1,14 @@
 package org.apache.qpid.server.queue;
 
 import java.util.Map;
+
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public abstract class OutOfOrderQueue extends SimpleAMQQueue
 {
+
     protected OutOfOrderQueue(String name, boolean durable, String owner,
                               boolean autoDelete, boolean exclusive, VirtualHost virtualHost,
                               QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
@@ -27,11 +29,8 @@ public abstract class OutOfOrderQueue ex
                 QueueContext context = (QueueContext) subscription.getQueueContext();
                 if(context != null)
                 {
-                    QueueEntry subnode = context._lastSeenEntry;
                     QueueEntry released = context._releasedEntry;
-
-                    while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired()
-                            && (released == null || released.compareTo(entry) > 0))
+                    while(!entry.isAcquired() && (released == null || released.compareTo(entry) > 0))
                     {
                         if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
                         {
@@ -39,14 +38,11 @@ public abstract class OutOfOrderQueue ex
                         }
                         else
                         {
-                            subnode = context._lastSeenEntry;
                             released = context._releasedEntry;
                         }
-
                     }
                 }
             }
-
         }
     }
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Thu Mar  1 15:42:44 2012
@@ -75,6 +75,11 @@ public interface QueueEntry extends Comp
         {
             return State.AVAILABLE;
         }
+
+        public String toString()
+        {
+            return getState().name();
+        }
     }
 
 
@@ -85,6 +90,11 @@ public interface QueueEntry extends Comp
         {
             return State.DEQUEUED;
         }
+
+        public String toString()
+        {
+            return getState().name();
+        }
     }
 
 
@@ -95,6 +105,11 @@ public interface QueueEntry extends Comp
         {
             return State.DELETED;
         }
+
+        public String toString()
+        {
+            return getState().name();
+        }
     }
 
     public final class ExpiredState extends EntryState
@@ -104,6 +119,11 @@ public interface QueueEntry extends Comp
         {
             return State.EXPIRED;
         }
+
+        public String toString()
+        {
+            return getState().name();
+        }
     }
 
 
@@ -113,6 +133,11 @@ public interface QueueEntry extends Comp
         {
             return State.ACQUIRED;
         }
+
+        public String toString()
+        {
+            return getState().name();
+        }
     }
 
     public final class SubscriptionAcquiredState extends EntryState
@@ -134,6 +159,11 @@ public interface QueueEntry extends Comp
         {
             return _subscription;
         }
+
+        public String toString()
+        {
+            return "{" + getState().name() + " : " + _subscription +"}";
+        }
     }
 
     public final class SubscriptionAssignedState extends EntryState
@@ -155,6 +185,12 @@ public interface QueueEntry extends Comp
         {
             return _subscription;
         }
+
+
+        public String toString()
+        {
+            return "{" + getState().name() + " : " + _subscription +"}";
+        }
     }
 
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Thu Mar  1 15:42:44 2012
@@ -35,6 +35,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 
 public abstract class QueueEntryImpl implements QueueEntry
@@ -420,7 +421,7 @@ public abstract class QueueEntryImpl imp
             if (rerouteQueues != null && rerouteQueues.size() != 0)
             {
 
-                ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+                ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
 
                 txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()
                 {
@@ -443,7 +444,8 @@ public abstract class QueueEntryImpl imp
                     {
 
                     }
-                });
+                }, 0L);
+
                 txn.dequeue(currentQueue, message, new ServerTransaction.Action()
                 {
                     public void postCommit()
@@ -456,8 +458,10 @@ public abstract class QueueEntryImpl imp
 
                     }
                 });
-                }
+
+                txn.commit();
             }
+        }
     }
 
     public boolean isQueueDeleted()
@@ -545,4 +549,11 @@ public abstract class QueueEntryImpl imp
         _deliveryCountUpdater.decrementAndGet(this);
     }
 
+    public String toString()
+    {
+        return "QueueEntryImpl{" +
+                "_entryId=" + _entryId +
+                ", _state=" + _state +
+                '}';
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Thu Mar  1 15:42:44 2012
@@ -35,4 +35,6 @@ public interface QueueEntryList<Q extend
     Q getHead();
 
     void entryDeleted(Q queueEntry);
+    
+    int getPriorities();
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Thu Mar  1 15:42:44 2012
@@ -22,7 +22,6 @@ package org.apache.qpid.server.queue;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.pool.ReadWriteRunnable;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.queue.QueueRunner;
 import org.apache.qpid.server.queue.SimpleAMQQueue;
@@ -38,7 +37,7 @@ import java.util.concurrent.atomic.Atomi
  * when straight-through delivery of a message to a subscription isn't
  * possible during the enqueue operation.
  */
-public class QueueRunner implements ReadWriteRunnable
+public class QueueRunner implements Runnable
 {
     private static final Logger _logger = Logger.getLogger(QueueRunner.class);
 
@@ -51,13 +50,11 @@ public class QueueRunner implements Read
 
     private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
 
-    private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
     private final AtomicBoolean _stateChange = new AtomicBoolean();
 
     private final AtomicLong _lastRunAgain = new AtomicLong();
     private final AtomicLong _lastRunTime = new AtomicLong();
 
-    private long _runs;
     private long _continues;
 
     public QueueRunner(SimpleAMQQueue queue)
@@ -65,8 +62,6 @@ public class QueueRunner implements Read
         _queue = queue;
     }
 
-    private int trouble = 0;
-
     public void run()
     {
         if(_scheduled.compareAndSet(SCHEDULED,RUNNING))
@@ -103,16 +98,6 @@ public class QueueRunner implements Read
         }
     }
 
-    public boolean isRead()
-    {
-        return false;
-    }
-
-    public boolean isWrite()
-    {
-        return true;
-    }
-
     public String toString()
     {
         return "QueueRunner-" + _queue.getLogActor();

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Mar  1 15:42:44 2012
@@ -18,22 +18,33 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.management.JMException;
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.pool.ReadWriteRunnable;
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ConfiguredObject;
 import org.apache.qpid.server.configuration.QueueConfigType;
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
@@ -43,8 +54,12 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.subjects.QueueLogSubject;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager;
+import org.apache.qpid.server.subscription.DefinedGroupMessageGroupManager;
+import org.apache.qpid.server.subscription.MessageGroupManager;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -52,27 +67,15 @@ import org.apache.qpid.server.txn.LocalT
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import javax.management.JMException;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
+public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
 {
     private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
+    private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
+    private static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
+    private static final String QPID_DEFAULT_MESSAGE_GROUP = "qpid.default-message-group";
+    private static final String QPID_NO_GROUP = "qpid.no-group";
+    // TODO - should make this configurable at the vhost / broker level
+    private static final int DEFAULT_MAX_GROUPS = 255;
 
 
     private final VirtualHost _virtualHost;
@@ -164,7 +167,7 @@ public class SimpleAMQQueue implements A
     private AtomicInteger _deliveredMessages = new AtomicInteger();
     private AtomicBoolean _stopped = new AtomicBoolean(false);
 
-    private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
+    private final ConcurrentMap<AMQSessionModel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQSessionModel, Boolean>();
 
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
     private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
@@ -190,6 +193,7 @@ public class SimpleAMQQueue implements A
 
     /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
     private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
+    private final MessageGroupManager _messageGroupManager;
 
     protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
     {
@@ -245,25 +249,15 @@ public class SimpleAMQQueue implements A
         _logSubject = new QueueLogSubject(this);
         _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
 
-        // Log the correct creation message
-
-        // Extract the number of priorities for this Queue.
-        // Leave it as 0 if we are a SimpleQueueEntryList
-        int priorities = 0;
-        if (entryListFactory instanceof PriorityQueueList.Factory)
-        {
-            priorities = ((PriorityQueueList)_entries).getPriorities();
-        }
-
         // Log the creation of this Queue.
         // The priorities display is toggled on if we set priorities > 0
         CurrentActor.get().message(_logSubject,
                                    QueueMessages.CREATED(String.valueOf(_owner),
-                                                          priorities,
-                                                          _owner != null,
-                                                          autoDelete,
-                                                          durable, !durable,
-                                                          priorities > 0));
+                                                         _entries.getPriorities(),
+                                                         _owner != null,
+                                                         autoDelete,
+                                                         durable, !durable,
+                                                         _entries.getPriorities() > 0));
 
         getConfigStore().addConfiguredObject(this);
 
@@ -277,6 +271,26 @@ public class SimpleAMQQueue implements A
             _logger.error("AMQQueue MBean creation has failed ", e);
         }
 
+        if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
+        {
+            if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1"))
+            {
+                String defaultGroup = String.valueOf(arguments.get(QPID_DEFAULT_MESSAGE_GROUP));
+                _messageGroupManager =
+                        new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)),
+                                defaultGroup == null ? QPID_NO_GROUP : defaultGroup,
+                                this);
+            }
+            else
+            {
+                _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), DEFAULT_MAX_GROUPS);
+            }
+        }
+        else
+        {
+            _messageGroupManager = null;
+        }
+
         resetNotifications();
 
     }
@@ -292,7 +306,7 @@ public class SimpleAMQQueue implements A
 
     // ------ Getters and Setters
 
-    public void execute(ReadWriteRunnable runnable)
+    public void execute(Runnable runnable)
     {
         _asyncDelivery.execute(runnable);
     }
@@ -491,6 +505,11 @@ public class SimpleAMQQueue implements A
             setExclusiveSubscriber(null);
             subscription.setQueueContext(null);
 
+            if(_messageGroupManager != null)
+            {
+                resetSubPointersForGroups(subscription, true);
+            }
+
             // auto-delete queues must be deleted if there are no remaining subscribers
 
             if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0  )
@@ -515,6 +534,34 @@ public class SimpleAMQQueue implements A
 
     }
 
+    public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments)
+    {
+        QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+        if(clearAssignments)
+        {
+            _messageGroupManager.clearAssignments(subscription);
+        }
+
+        if(entry != null)
+        {
+            SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+            // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+            while (subscriberIter.advance())
+            {
+                Subscription sub = subscriberIter.getNode().getSubscription();
+
+                // we don't make browsers send the same stuff twice
+                if (sub.seesRequeues())
+                {
+                    updateSubRequeueEntry(sub, entry);
+                }
+            }
+
+            deliverAsync();
+
+        }
+    }
+
     public boolean getDeleteOnNoConsumers()
     {
         return _deleteOnNoConsumers;
@@ -592,7 +639,16 @@ public class SimpleAMQQueue implements A
 
     public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
     {
-        incrementTxnEnqueueStats(message);
+        enqueue(message, false, action);
+    }
+
+    public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException
+    {
+
+        if(transactional)
+        {
+            incrementTxnEnqueueStats(message);
+        }
         incrementQueueCount();
         incrementQueueSize(message);
 
@@ -689,21 +745,20 @@ public class SimpleAMQQueue implements A
         {
             try
             {
-                if (subscriptionReadyAndHasInterest(sub, entry)
-                    && !sub.isSuspended())
-                {
-                    if (!sub.wouldSuspend(entry))
+                if (!sub.isSuspended() 
+                    && subscriptionReadyAndHasInterest(sub, entry) 
+                    && mightAssign(sub, entry)
+                    && !sub.wouldSuspend(entry))
+                {
+                    if (sub.acquires() && !(assign(sub, entry) && entry.acquire(sub)))
+                    {
+                        // restore credit here that would have been taken away by wouldSuspend since we didn't manage
+                        // to acquire the entry for this subscription
+                        sub.restoreCredit(entry);
+                    }
+                    else
                     {
-                        if (sub.acquires() && !entry.acquire(sub))
-                        {
-                            // restore credit here that would have been taken away by wouldSuspend since we didn't manage
-                            // to acquire the entry for this subscription
-                            sub.restoreCredit(entry);
-                        }
-                        else
-                        {
-                            deliverMessage(sub, entry, false);
-                        }
+                        deliverMessage(sub, entry, false);
                     }
                 }
             }
@@ -714,6 +769,20 @@ public class SimpleAMQQueue implements A
         }
     }
 
+    private boolean assign(final Subscription sub, final QueueEntry entry)
+    {
+        return _messageGroupManager == null || _messageGroupManager.acceptMessage(sub, entry);
+    }
+
+
+    private boolean mightAssign(final Subscription sub, final QueueEntry entry)
+    {
+        if(_messageGroupManager == null || !sub.acquires())
+            return true;
+        Subscription assigned = _messageGroupManager.getAssignedSubscription(entry);
+        return (assigned == null) || (assigned == sub);
+    }
+
     protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
     {
         // This method is only required for queues which mess with ordering
@@ -739,13 +808,8 @@ public class SimpleAMQQueue implements A
 
     private void incrementTxnEnqueueStats(final ServerMessage message)
     {
-        SessionConfig session = message.getSessionConfig();
-
-        if(session !=null && session.isTransactional())
-        {
-            _msgTxnEnqueues.incrementAndGet();
-            _byteTxnEnqueues.addAndGet(message.getSize());
-        }
+        _msgTxnEnqueues.incrementAndGet();
+        _byteTxnEnqueues.addAndGet(message.getSize());
     }
 
     private void incrementTxnDequeueStats(QueueEntry entry)
@@ -1057,6 +1121,8 @@ public class SimpleAMQQueue implements A
         public boolean filterComplete();
     }
 
+
+
     public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
     {
         return getMessagesOnTheQueue(new QueueEntryFilter()
@@ -1111,6 +1177,24 @@ public class SimpleAMQQueue implements A
 
     }
 
+    public void visit(final Visitor visitor)
+    {
+        QueueEntryIterator queueListIterator = _entries.iterator();
+
+        while(queueListIterator.advance())
+        {
+            QueueEntry node = queueListIterator.getNode();
+
+            if(!node.isDispensed())
+            {
+                if(visitor.visit(node))
+                {
+                    break;
+                }
+            }
+        }
+    }
+
     /**
      * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
      *
@@ -1487,7 +1571,7 @@ public class SimpleAMQQueue implements A
                                         {
 
                                         }
-                                    });
+                                    }, 0L);
                         txn.dequeue(this, entry.getMessage(),
                                     new ServerTransaction.Action()
                                     {
@@ -1565,7 +1649,7 @@ public class SimpleAMQQueue implements A
         }
     }
 
-    public void checkCapacity(AMQChannel channel)
+    public void checkCapacity(AMQSessionModel channel)
     {
         if(_capacity != 0l)
         {
@@ -1575,10 +1659,9 @@ public class SimpleAMQQueue implements A
                 //Overfull log message
                 _logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity));
 
-                if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null)
-                {
-                    channel.block(this);
-                }
+                _blockedChannels.putIfAbsent(channel, Boolean.TRUE);
+
+                channel.block(this);
 
                 if(_atomicQueueSize.get() <= _flowResumeCapacity)
                 {
@@ -1610,7 +1693,7 @@ public class SimpleAMQQueue implements A
                 }
 
 
-                for(AMQChannel c : _blockedChannels.keySet())
+                for(AMQSessionModel c : _blockedChannels.keySet())
                 {
                     c.unblock(this);
                     _blockedChannels.remove(c);
@@ -1752,11 +1835,11 @@ public class SimpleAMQQueue implements A
 
             if (node != null && node.isAvailable())
             {
-                if (sub.hasInterest(node))
+                if (sub.hasInterest(node) && mightAssign(sub, node))
                 {
                     if (!sub.wouldSuspend(node))
                     {
-                        if (sub.acquires() && !node.acquire(sub))
+                        if (sub.acquires() && !(assign(sub, node) && node.acquire(sub)))
                         {
                             // restore credit here that would have been taken away by wouldSuspend since we didn't manage
                             // to acquire the entry for this subscription
@@ -1813,7 +1896,8 @@ public class SimpleAMQQueue implements A
             QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
 
             boolean expired = false;
-            while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node)))
+            while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) ||
+                                    !mightAssign(sub,node)))
             {
                 if (expired)
                 {
@@ -1841,6 +1925,19 @@ public class SimpleAMQQueue implements A
         }
     }
 
+    public boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription sub)
+    {
+        QueueContext context = (QueueContext) sub.getQueueContext();
+        if(context != null)
+        {
+            QueueEntry releasedNode = context._releasedEntry;
+            return releasedNode == null || releasedNode.compareTo(entry) < 0;
+        }
+        else
+        {
+            return false;
+        }
+    }
 
     /**
      * Used by queue Runners to asynchronously deliver messages to consumers.

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Thu Mar  1 15:42:44 2012
@@ -185,6 +185,11 @@ public class SimpleQueueEntryList implem
         advanceHead();
     }
 
+    public int getPriorities()
+    {
+        return 0;
+    }
+
     static class Factory implements QueueEntryListFactory
     {
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Thu Mar  1 15:42:44 2012
@@ -51,13 +51,11 @@ public class SortedQueueEntryList implem
         _propertyName = propertyName;
     }
 
-    @Override
     public AMQQueue getQueue()
     {
         return _queue;
     }
 
-    @Override
     public SortedQueueEntryImpl add(final ServerMessage message)
     {
         synchronized(_lock)
@@ -286,7 +284,6 @@ public class SortedQueueEntryList implem
         return (node == null ? Colour.BLACK : node.getColour()) == colour;
     }
 
-    @Override
     public SortedQueueEntryImpl next(final SortedQueueEntryImpl node)
     {
         synchronized(_lock)
@@ -316,13 +313,11 @@ public class SortedQueueEntryList implem
         }
     }
 
-    @Override
     public QueueEntryIterator<SortedQueueEntryImpl> iterator()
     {
         return new QueueEntryIteratorImpl(_head);
     }
 
-    @Override
     public SortedQueueEntryImpl getHead()
     {
         return _head;
@@ -333,7 +328,6 @@ public class SortedQueueEntryList implem
         return _root;
     }
 
-    @Override
     public void entryDeleted(final SortedQueueEntryImpl entry)
     {
         synchronized(_lock)
@@ -431,6 +425,11 @@ public class SortedQueueEntryList implem
         }
     }
 
+    public int getPriorities()
+    {
+        return 0;
+    }
+
     /**
      * Swaps the position of the node in the tree with it's successor
      * (that is the node with the next highest key)

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Thu Mar  1 15:42:44 2012
@@ -21,18 +21,17 @@ package org.apache.qpid.server.queue;
  */
 
 
-import org.apache.qpid.pool.ReadWriteRunnable;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.AMQException;
-import org.apache.log4j.Logger;
-
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.subscription.Subscription;
+
 
-class SubFlushRunner implements ReadWriteRunnable
+class SubFlushRunner implements Runnable
 {
     private static final Logger _logger = Logger.getLogger(SubFlushRunner.class);
 
@@ -90,16 +89,6 @@ class SubFlushRunner implements ReadWrit
         return (SimpleAMQQueue) _sub.getQueue();
     }
 
-    public boolean isRead()
-    {
-        return false;
-    }
-
-    public boolean isWrite()
-    {
-        return true;
-    }
-
     public String toString()
     {
         return "SubFlushRunner-" + _sub.getLogActor();

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Thu Mar  1 15:42:44 2012
@@ -148,7 +148,7 @@ public abstract class ApplicationRegistr
 
         BrokerConfig broker = new BrokerConfigAdapter(instance);
 
-        SystemConfig system = (SystemConfig) store.getRoot();
+        SystemConfig system = store.getRoot();
         system.addBroker(broker);
         instance.setBroker(broker);
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java Thu Mar  1 15:42:44 2012
@@ -165,7 +165,6 @@ public class BrokerConfigAdapter impleme
     /**
      * @see org.apache.qpid.server.configuration.BrokerConfig#getFeatures()
      */
-    @Override
     public List<String> getFeatures()
     {
         final List<String> features = new ArrayList<String>();
@@ -176,4 +175,16 @@ public class BrokerConfigAdapter impleme
 
         return Collections.unmodifiableList(features);
     }
+
+    @Override
+    public String toString()
+    {
+        return "BrokerConfigAdapter{" +
+               "_id=" + _id +
+               ", _system=" + _system +
+               ", _vhosts=" + _vhosts +
+               ", _createTime=" + _createTime +
+               ", _federationTag='" + _federationTag + '\'' +
+               '}';
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java Thu Mar  1 15:42:44 2012
@@ -32,11 +32,9 @@ import static org.apache.qpid.server.sec
 
 import java.net.SocketAddress;
 import java.security.Principal;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.security.auth.Subject;
 
@@ -192,6 +190,15 @@ public class SecurityManager
         return _logger;
     }
 
+    private static class CachedPropertiesMap extends LinkedHashMap<String, PublishAccessCheck>
+    {
+        @Override
+        protected boolean removeEldestEntry(Entry<String, PublishAccessCheck> eldest)
+        {
+            return size() >= 200;
+        }
+    }
+
     private abstract class AccessCheck
     {
         abstract Result allowed(SecurityPlugin plugin);
@@ -204,56 +211,61 @@ public class SecurityManager
             return true;
         }
 
-        HashMap<String, SecurityPlugin> remainingPlugins = new HashMap<String, SecurityPlugin>(_globalPlugins);
+        Map<String, SecurityPlugin> remainingPlugins = _globalPlugins.isEmpty()
+                ? Collections.<String, SecurityPlugin>emptyMap()
+                : _hostPlugins.isEmpty() ? _globalPlugins : new HashMap<String, SecurityPlugin>(_globalPlugins);
 		
-		for (Entry<String, SecurityPlugin> hostEntry : _hostPlugins.entrySet())
+		if(!_hostPlugins.isEmpty())
         {
-		    // Create set of global only plugins
-			SecurityPlugin globalPlugin = remainingPlugins.get(hostEntry.getKey());
-			if (globalPlugin != null)
-			{
-				remainingPlugins.remove(hostEntry.getKey());
-			}
-			
-            Result host = checker.allowed(hostEntry.getValue());
-			
-			if (host == Result.DENIED)
-			{
-				// Something vetoed the access, we're done
-				return false;
-			}
-            
-			// host allow overrides global allow, so only check global on abstain or defer
-			if (host != Result.ALLOWED)
-			{
-				if (globalPlugin == null)
-				{
-				    if (host == Result.DEFER)
-				    {
-				        host = hostEntry.getValue().getDefault();
-                    }
-                    if (host == Result.DENIED)
+            for (Entry<String, SecurityPlugin> hostEntry : _hostPlugins.entrySet())
+            {
+                // Create set of global only plugins
+                SecurityPlugin globalPlugin = remainingPlugins.get(hostEntry.getKey());
+                if (globalPlugin != null)
+                {
+                    remainingPlugins.remove(hostEntry.getKey());
+                }
+
+                Result host = checker.allowed(hostEntry.getValue());
+
+                if (host == Result.DENIED)
+                {
+                    // Something vetoed the access, we're done
+                    return false;
+                }
+
+                // host allow overrides global allow, so only check global on abstain or defer
+                if (host != Result.ALLOWED)
+                {
+                    if (globalPlugin == null)
                     {
-                        return false;
+                        if (host == Result.DEFER)
+                        {
+                            host = hostEntry.getValue().getDefault();
+                        }
+                        if (host == Result.DENIED)
+                        {
+                            return false;
+                        }
                     }
-				}
-				else
-				{
-				    Result global = checker.allowed(globalPlugin);
-					if (global == Result.DEFER)
-					{
-					    global = globalPlugin.getDefault();
-					}
-					if (global == Result.ABSTAIN && host == Result.DEFER)
-					{
-					    global = hostEntry.getValue().getDefault();
-					}
-					if (global == Result.DENIED)
+                    else
                     {
-                        return false;
+                        Result global = checker.allowed(globalPlugin);
+                        if (global == Result.DEFER)
+                        {
+                            global = globalPlugin.getDefault();
+                        }
+                        if (global == Result.ABSTAIN && host == Result.DEFER)
+                        {
+                            global = hostEntry.getValue().getDefault();
+                        }
+                        if (global == Result.DENIED)
+                        {
+                            return false;
+                        }
                     }
-				}
-			}
+                }
+            }
         }
 
         for (SecurityPlugin plugin : remainingPlugins.values())
@@ -371,15 +383,41 @@ public class SecurityManager
         });
     }
 
-    public boolean authorisePublish(final boolean immediate, final String routingKey, final String exchangeName)
+
+    private ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> _immediatePublishPropsCache
+            = new ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>>();
+    private ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> _publishPropsCache
+            = new ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>>();
+
+    public boolean authorisePublish(final boolean immediate, String routingKey, String exchangeName)
     {
-        return checkAllPlugins(new AccessCheck()
+        if(routingKey == null)
         {
-            Result allowed(SecurityPlugin plugin)
+            routingKey = "";
+        }
+        if(exchangeName == null)
+        {
+            exchangeName = "";
+        }
+        PublishAccessCheck check;
+        ConcurrentHashMap<String, ConcurrentHashMap<String, PublishAccessCheck>> cache =
+                immediate ? _immediatePublishPropsCache : _publishPropsCache;
+
+        ConcurrentHashMap<String, PublishAccessCheck> exchangeMap = cache.get(exchangeName);
+        if(exchangeMap == null)
+        {
+            cache.putIfAbsent(exchangeName, new ConcurrentHashMap<String, PublishAccessCheck>());
+            exchangeMap = cache.get(exchangeName);
+        }
+
+            check = exchangeMap.get(routingKey);
+            if(check == null)
             {
-                return plugin.authorise(PUBLISH, EXCHANGE, new ObjectProperties(exchangeName, routingKey, immediate));
+                check = new PublishAccessCheck(new ObjectProperties(exchangeName, routingKey, immediate));
+                exchangeMap.put(routingKey, check);
             }
-        });
+
+        return checkAllPlugins(check);
     }
 
     public boolean authorisePurge(final AMQQueue queue)
@@ -413,4 +451,19 @@ public class SecurityManager
 
         return current;
     }
+
+    private class PublishAccessCheck extends AccessCheck
+    {
+        private final ObjectProperties _props;
+
+        public PublishAccessCheck(ObjectProperties props)
+        {
+            _props = props;
+        }
+
+        Result allowed(SecurityPlugin plugin)
+        {
+            return plugin.authorise(PUBLISH, EXCHANGE, _props);
+        }
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java Thu Mar  1 15:42:44 2012
@@ -18,10 +18,7 @@
  */
 package org.apache.qpid.server.security.access;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.qpid.framing.AMQShortString;
@@ -35,7 +32,7 @@ import org.apache.qpid.server.queue.AMQQ
  * {@link #equals(Object)} and {@link #hashCode()} are intended for use in maps. This is due to the wildcard matching
  * described above.
  */
-public class ObjectProperties extends HashMap<ObjectProperties.Property, String>
+public class ObjectProperties
 {
     /** serialVersionUID */
     private static final long serialVersionUID = -1356019341374170495L;
@@ -93,7 +90,9 @@ public class ObjectProperties extends Ha
 			return properties;
 		}
     }
-	
+
+    private final EnumMap<Property, String> _properties = new EnumMap<Property, String>(Property.class);
+
 	public static List<String> getAllPropertyNames()
     {
 		List<String> properties = new ArrayList<String>();		
@@ -113,7 +112,7 @@ public class ObjectProperties extends Ha
     {
         super();
         
-        putAll(copy);
+        _properties.putAll(copy._properties);
     }
     
     public ObjectProperties(String name)
@@ -231,7 +230,7 @@ public class ObjectProperties extends Ha
 	public List<String> getPropertyNames()
     {
 		List<String> properties = new ArrayList<String>();		
-		for (Property property : keySet())
+		for (Property property : _properties.keySet())
 		{
 			properties.add(property.getName());
 		}
@@ -240,17 +239,22 @@ public class ObjectProperties extends Ha
     
     public Boolean isSet(Property key)
     {
-        return containsKey(key) && Boolean.valueOf(get(key));
+        return _properties.containsKey(key) && Boolean.valueOf(_properties.get(key));
     }
-    
+
+    public String get(Property key)
+    {
+        return _properties.get(key);
+    }
+
     public String getName()
     {
-        return get(Property.NAME);
+        return _properties.get(Property.NAME);
     }
     
     public void setName(String name)
     {
-        put(Property.NAME, name);
+        _properties.put(Property.NAME, name);
     }
     
     public void setName(AMQShortString name)
@@ -262,39 +266,38 @@ public class ObjectProperties extends Ha
     {
         return put(key, value == null ? "" : value.asString());
     }
-    
-    @Override
+
     public String put(Property key, String value)
     {
-        return super.put(key, value == null ? "" : value.trim());
+        return _properties.put(key, value == null ? "" : value.trim());
     }
     
     public void put(Property key, Boolean value)
     {
         if (value != null)
         {
-            super.put(key, Boolean.toString(value));
+            _properties.put(key, Boolean.toString(value));
         }
     }
     
     public boolean matches(ObjectProperties properties)
     {
-        if (properties.keySet().isEmpty())
+        if (properties._properties.keySet().isEmpty())
         {
             return true;
         }
         
-        if (!keySet().containsAll(properties.keySet()))
+        if (!_properties.keySet().containsAll(properties._properties.keySet()))
         {
             return false;
         }
         
-        for (Map.Entry<Property,String> entry : properties.entrySet())
+        for (Map.Entry<Property,String> entry : properties._properties.entrySet())
         {
             Property key = entry.getKey();
             String ruleValue = entry.getValue();
             
-            String thisValue = get(key);
+            String thisValue = _properties.get(key);
 
             if (!valueMatches(thisValue, ruleValue)) 
             {
@@ -315,4 +318,29 @@ public class ObjectProperties extends Ha
                         && thisValue.length() > ruleValue.length()
                         && thisValue.startsWith(ruleValue.substring(0, ruleValue.length() - 2)));
     }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ObjectProperties that = (ObjectProperties) o;
+
+        if (_properties != null ? !_properties.equals(that._properties) : that._properties != null) return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return _properties != null ? _properties.hashCode() : 0;
+    }
+
+    @Override
+    public String toString()
+    {
+        return _properties.toString();
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Thu Mar  1 15:42:44 2012
@@ -21,6 +21,9 @@
 package org.apache.qpid.server.store;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+
 import org.apache.qpid.framing.FieldTable;
 
 public interface ConfigurationRecoveryHandler
@@ -42,7 +45,19 @@ public interface ConfigurationRecoveryHa
     public static interface BindingRecoveryHandler
     {
         void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf);
-        void completeBindingRecovery();
+        BrokerLinkRecoveryHandler completeBindingRecovery();
+    }
+    
+    public static interface BrokerLinkRecoveryHandler
+    {
+        BridgeRecoveryHandler brokerLink(UUID id, long createTime, Map<String,String> arguments);
+        void completeBrokerLinkRecovery();
+    }
+    
+    public static interface BridgeRecoveryHandler
+    {
+        void bridge(UUID id, long createTime, Map<String,String> arguments);
+        void completeBridgeRecoveryForLink();
     }
 
     public static interface QueueEntryRecoveryHandler

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java Thu Mar  1 15:42:44 2012
@@ -21,7 +21,9 @@
 package org.apache.qpid.server.store;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.lang.ref.SoftReference;
@@ -36,7 +38,10 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -47,11 +52,14 @@ import org.apache.qpid.AMQStoreException
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
 import org.apache.qpid.server.logging.messages.MessageStoreMessages;
 import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 
 /**
@@ -60,7 +68,7 @@ import org.apache.qpid.server.queue.AMQQ
  * 
  * TODO extract the SQL statements into a generic JDBC store
  */
-public class DerbyMessageStore implements MessageStore
+public class DerbyMessageStore implements MessageStore, DurableConfigurationStore
 {
 
     private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
@@ -80,6 +88,10 @@ public class DerbyMessageStore implement
     private static final String META_DATA_TABLE_NAME = "QPID_META_DATA";
     private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
 
+    private static final String LINKS_TABLE_NAME = "QPID_LINKS";
+    private static final String BRIDGES_TABLE_NAME = "QPID_BRIDGES";
+
+
     private static final int DB_VERSION = 3;
 
 
@@ -135,6 +147,49 @@ public class DerbyMessageStore implement
     private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
     private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
 
+    private static final String CREATE_LINKS_TABLE =
+            "CREATE TABLE "+LINKS_TABLE_NAME+" ( id_lsb bigint not null,"
+                                            + " id_msb bigint not null,"
+                                             + " create_time bigint not null,"
+                                             + " arguments blob,  PRIMARY KEY ( id_lsb, id_msb ))";
+    private static final String SELECT_FROM_LINKS =
+            "SELECT create_time, arguments FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and id_msb";
+    private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME 
+                                                    + " WHERE id_lsb = ? and id_msb = ?";
+    private static final String SELECT_ALL_FROM_LINKS = "SELECT id_lsb, id_msb, create_time, "
+                                                        + "arguments FROM " + LINKS_TABLE_NAME;
+    private static final String FIND_LINK = "SELECT id_lsb, id_msb FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and"
+                                            + " id_msb = ?";
+    private static final String INSERT_INTO_LINKS = "INSERT INTO " + LINKS_TABLE_NAME + "( id_lsb, "
+                                                  + "id_msb, create_time, arguments ) values (?, ?, ?, ?)";
+
+
+    private static final String CREATE_BRIDGES_TABLE =
+            "CREATE TABLE "+BRIDGES_TABLE_NAME+" ( id_lsb bigint not null,"
+            + " id_msb bigint not null,"
+            + " create_time bigint not null,"
+            + " link_id_lsb bigint not null,"
+            + " link_id_msb bigint not null,"
+            + " arguments blob,  PRIMARY KEY ( id_lsb, id_msb ))";
+    private static final String SELECT_FROM_BRIDGES =
+            "SELECT create_time, link_id_lsb, link_id_msb, arguments FROM " 
+            + BRIDGES_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?";
+    private static final String DELETE_FROM_BRIDGES = "DELETE FROM " + BRIDGES_TABLE_NAME 
+                                                      + " WHERE id_lsb = ? and id_msb = ?";
+    private static final String SELECT_ALL_FROM_BRIDGES = "SELECT id_lsb, id_msb, " 
+                                                          + " create_time," 
+                                                          + " link_id_lsb, link_id_msb, "
+                                                        + "arguments FROM " + BRIDGES_TABLE_NAME
+                                                        + " WHERE link_id_lsb = ? and link_id_msb = ?";
+    private static final String FIND_BRIDGE = "SELECT id_lsb, id_msb FROM " + BRIDGES_TABLE_NAME +
+                                              " WHERE id_lsb = ? and id_msb = ?";
+    private static final String INSERT_INTO_BRIDGES = "INSERT INTO " + BRIDGES_TABLE_NAME + "( id_lsb, id_msb, "
+                                                    + "create_time, "
+                                                    + "link_id_lsb, link_id_msb, "
+                                                    + "arguments )"
+                                                    + " values (?, ?, ?, ?, ?, ?)";
+
+
     private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
 
 
@@ -197,12 +252,16 @@ public class DerbyMessageStore implement
                           Configuration storeConfiguration,
                           LogSubject logSubject) throws Exception
     {
-        CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
-
         if(!_configured)
         {
 
             _logSubject = logSubject;
+        }
+
+        CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+
+        if(!_configured)
+        {
 
             commonConfiguration(name, storeConfiguration, logSubject);
             _configured = true;
@@ -219,6 +278,11 @@ public class DerbyMessageStore implement
                           Configuration storeConfiguration,
                           LogSubject logSubject) throws Exception
     {
+
+        if(!_configured)
+        {
+            _logSubject = logSubject;
+        }
         CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
 
         if(!_configured)
@@ -283,6 +347,8 @@ public class DerbyMessageStore implement
         createQueueEntryTable(conn);
         createMetaDataTable(conn);
         createMessageContentTable(conn);
+        createLinkTable(conn);
+        createBridgeTable(conn);
 
         conn.close();
     }
@@ -419,6 +485,40 @@ public class DerbyMessageStore implement
 
     }
 
+    private void createLinkTable(final Connection conn) throws SQLException
+    {
+        if(!tableExists(LINKS_TABLE_NAME, conn))
+        {
+            Statement stmt = conn.createStatement();
+            try
+            {
+                stmt.execute(CREATE_LINKS_TABLE);
+            }
+            finally
+            {
+                stmt.close();
+            }
+        }
+    }
+
+
+    private void createBridgeTable(final Connection conn) throws SQLException
+    {
+        if(!tableExists(BRIDGES_TABLE_NAME, conn))
+        {
+            Statement stmt = conn.createStatement();
+            try
+            {
+                stmt.execute(CREATE_BRIDGES_TABLE);
+            }
+            finally
+            {
+                stmt.close();
+            }
+        }
+    }
+
+
 
 
     private boolean tableExists(final String tableName, final Connection conn) throws SQLException
@@ -459,7 +559,8 @@ public class DerbyMessageStore implement
             List<String> exchanges = loadExchanges(erh);
             ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery();
             recoverBindings(brh, exchanges);
-            brh.completeBindingRecovery();
+            ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
+            recoverBrokerLinks(lrh);
         }
         catch (SQLException e)
         {
@@ -470,6 +571,144 @@ public class DerbyMessageStore implement
 
     }
 
+    private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
+            throws SQLException
+    {
+        _logger.info("Recovering broker links...");
+
+        Connection conn = null;
+        try
+        {
+            conn = newAutoCommitConnection();
+
+            PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_LINKS);
+
+            try
+            {
+                ResultSet rs = stmt.executeQuery();
+
+                try
+                {
+
+                    while(rs.next())
+                    {
+                        UUID id  = new UUID(rs.getLong(2), rs.getLong(1));
+                        long createTime = rs.getLong(3);
+                        Blob argumentsAsBlob = rs.getBlob(4);
+
+                        byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
+                        
+                        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
+                        int size = dis.readInt();
+                        
+                        Map<String,String> arguments = new HashMap<String, String>();
+                        
+                        for(int i = 0; i < size; i++)
+                        {
+                            arguments.put(dis.readUTF(), dis.readUTF());
+                        }
+
+                        ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments);
+
+                        recoverBridges(brh, id);
+
+                    }
+                }
+                catch (IOException e)
+                {
+                    throw new SQLException(e.getMessage(), e);
+                }
+                finally
+                {
+                    rs.close();
+                }
+            }
+            finally
+            {
+                stmt.close();
+            }
+
+        }
+        finally
+        {
+            if(conn != null)
+            {
+                conn.close();
+            }
+        }
+
+    }
+
+    private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId)
+            throws SQLException
+    {
+        _logger.info("Recovering bridges for link " + linkId + "...");
+
+        Connection conn = null;
+        try
+        {
+            conn = newAutoCommitConnection();
+
+            PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_BRIDGES);
+            stmt.setLong(1, linkId.getLeastSignificantBits());
+            stmt.setLong(2, linkId.getMostSignificantBits());
+
+
+            try
+            {
+                ResultSet rs = stmt.executeQuery();
+
+                try
+                {
+
+                    while(rs.next())
+                    {
+                        UUID id  = new UUID(rs.getLong(2), rs.getLong(1));
+                        long createTime = rs.getLong(3);
+                        Blob argumentsAsBlob = rs.getBlob(6);
+
+                        byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
+
+                        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
+                        int size = dis.readInt();
+
+                        Map<String,String> arguments = new HashMap<String, String>();
+
+                        for(int i = 0; i < size; i++)
+                        {
+                            arguments.put(dis.readUTF(), dis.readUTF());
+                        }
+
+                        brh.bridge(id, createTime, arguments);
+
+                    }
+                    brh.completeBridgeRecoveryForLink();
+                }
+                catch (IOException e)
+                {
+                    throw new SQLException(e.getMessage(), e);
+                }
+                finally
+                {
+                    rs.close();
+                }
+            }
+            finally
+            {
+                stmt.close();
+            }
+
+        }
+        finally
+        {
+            if(conn != null)
+            {
+                conn.close();
+            }
+        }
+
+    }
+
     private void loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException
     {
         Connection conn = newAutoCommitConnection();
@@ -697,7 +936,7 @@ public class DerbyMessageStore implement
 
                     if (results == 0)
                     {
-                        throw new RuntimeException("Message metadata not found for message id " + messageId);
+                        _logger.warn("Message metadata not found for message id " + messageId);
                     }
 
                     if (_logger.isDebugEnabled())
@@ -1180,6 +1419,233 @@ public class DerbyMessageStore implement
 
     }
 
+    public void createBrokerLink(final BrokerLink link) throws AMQStoreException
+    {
+        _logger.debug("public void createBrokerLink(BrokerLink = " + link + "): called");
+
+        if (_state != State.RECOVERING)
+        {
+            try
+            {
+                Connection conn = newAutoCommitConnection();
+
+                PreparedStatement stmt = conn.prepareStatement(FIND_LINK);
+                try
+                {
+                    
+                    stmt.setLong(1, link.getId().getLeastSignificantBits());
+                    stmt.setLong(2, link.getId().getMostSignificantBits());
+                    ResultSet rs = stmt.executeQuery();
+                    try
+                    {
+
+                        // If we don't have any data in the result set then we can add this queue
+                        if (!rs.next())
+                        {
+                            PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_LINKS);
+
+                            try
+                            {
+                                
+                                insertStmt.setLong(1, link.getId().getLeastSignificantBits());
+                                insertStmt.setLong(2, link.getId().getMostSignificantBits());
+                                insertStmt.setLong(3, link.getCreateTime());
+
+                                byte[] argumentBytes = convertStringMapToBytes(link.getArguments());
+                                ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
+
+                                insertStmt.setBinaryStream(4,bis,argumentBytes.length);
+
+                                insertStmt.execute();
+                            }
+                            finally
+                            {
+                                insertStmt.close();
+                            }
+                        }
+                    }
+                    finally
+                    {
+                        rs.close();
+                    }
+                }
+                finally
+                {
+                    stmt.close();
+                }
+                conn.close();
+
+            }
+            catch (SQLException e)
+            {
+                throw new AMQStoreException("Error writing " + link + " to database: " + e.getMessage(), e);
+            }
+        }
+    }
+
+    private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException
+    {
+        byte[] argumentBytes;
+        if(arguments == null)
+        {
+            argumentBytes = new byte[0];
+        }
+        else
+        {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(bos);
+
+
+            try
+            {
+                dos.writeInt(arguments.size());
+                for(Map.Entry<String,String> arg : arguments.entrySet())
+                {
+                    dos.writeUTF(arg.getKey());
+                    dos.writeUTF(arg.getValue());
+                }
+            }
+            catch (IOException e)
+            {
+                // This should never happen
+                throw new AMQStoreException(e.getMessage(), e);
+            }
+            argumentBytes = bos.toByteArray();
+        }
+        return argumentBytes;
+    }
+
+    public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
+    {
+        _logger.debug("public void deleteBrokerLink( " + link + "): called");
+        Connection conn = null;
+        PreparedStatement stmt = null;
+        try
+        {
+            conn = newAutoCommitConnection();
+            stmt = conn.prepareStatement(DELETE_FROM_LINKS);
+            stmt.setLong(1, link.getId().getLeastSignificantBits());
+            stmt.setLong(2, link.getId().getMostSignificantBits());
+            int results = stmt.executeUpdate();
+
+            if (results == 0)
+            {
+                throw new AMQStoreException("Link " + link + " not found");
+            }
+        }
+        catch (SQLException e)
+        {
+            throw new AMQStoreException("Error deleting Link " + link + " from database: " + e.getMessage(), e);
+        }
+        finally
+        {
+            closePreparedStatement(stmt);
+            closeConnection(conn);
+        }
+
+
+    }
+
+    public void createBridge(final Bridge bridge) throws AMQStoreException
+    {
+        _logger.debug("public void createBridge(BrokerLink = " + bridge + "): called");
+
+        if (_state != State.RECOVERING)
+        {
+            try
+            {
+                Connection conn = newAutoCommitConnection();
+
+                PreparedStatement stmt = conn.prepareStatement(FIND_BRIDGE);
+                try
+                {
+
+                    UUID id = bridge.getId();
+                    stmt.setLong(1, id.getLeastSignificantBits());
+                    stmt.setLong(2, id.getMostSignificantBits());
+                    ResultSet rs = stmt.executeQuery();
+                    try
+                    {
+
+                        // If we don't have any data in the result set then we can add this queue
+                        if (!rs.next())
+                        {
+                            PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BRIDGES);
+
+                            try
+                            {
+
+                                insertStmt.setLong(1, id.getLeastSignificantBits());
+                                insertStmt.setLong(2, id.getMostSignificantBits());
+
+                                insertStmt.setLong(3, bridge.getCreateTime());
+
+                                UUID linkId = bridge.getLink().getId();
+                                insertStmt.setLong(4, linkId.getLeastSignificantBits());
+                                insertStmt.setLong(5, linkId.getMostSignificantBits());
+
+                                byte[] argumentBytes = convertStringMapToBytes(bridge.getArguments());
+                                ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
+
+                                insertStmt.setBinaryStream(6,bis,argumentBytes.length);
+
+                                insertStmt.execute();
+                            }
+                            finally
+                            {
+                                insertStmt.close();
+                            }
+                        }
+                    }
+                    finally
+                    {
+                        rs.close();
+                    }
+                }
+                finally
+                {
+                    stmt.close();
+                }
+                conn.close();
+
+            }
+            catch (SQLException e)
+            {
+                throw new AMQStoreException("Error writing " + bridge + " to database: " + e.getMessage(), e);
+            }
+        }
+    }
+
+    public void deleteBridge(final Bridge bridge) throws AMQStoreException
+    {
+        _logger.debug("public void deleteBridge( " + bridge + "): called");
+        Connection conn = null;
+        PreparedStatement stmt = null;
+        try
+        {
+            conn = newAutoCommitConnection();
+            stmt = conn.prepareStatement(DELETE_FROM_BRIDGES);
+            stmt.setLong(1, bridge.getId().getLeastSignificantBits());
+            stmt.setLong(2, bridge.getId().getMostSignificantBits());
+            int results = stmt.executeUpdate();
+
+            if (results == 0)
+            {
+                throw new AMQStoreException("Bridge " + bridge + " not found");
+            }
+        }
+        catch (SQLException e)
+        {
+            throw new AMQStoreException("Error deleting bridge " + bridge + " from database: " + e.getMessage(), e);
+        }
+        finally
+        {
+            closePreparedStatement(stmt);
+            closeConnection(conn);
+        }
+
+    }
+
     public Transaction newTransaction()
     {
         return new DerbyTransaction();
@@ -1678,14 +2144,26 @@ public class DerbyMessageStore implement
             }
         }
 
-        public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+        public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
         {
-            DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, messageId);
+            if(message.getStoredMessage() instanceof StoredDerbyMessage)
+            {
+                try
+                {
+                    ((StoredDerbyMessage)message.getStoredMessage()).store(_connWrapper.getConnection());
+                }
+                catch (SQLException e)
+                {
+                    throw new AMQStoreException("Exception on enqueuing message " + _messageId, e);
+                }
+            }
+
+            DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
         }
 
-        public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQStoreException
+        public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
         {
-            DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, messageId);
+            DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
 
         }
 
@@ -1709,8 +2187,11 @@ public class DerbyMessageStore implement
     {
 
         private final long _messageId;
+        private StorableMessageMetaData _metaData;
         private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
-        private Connection _conn;
+        private byte[] _data;
+        private volatile SoftReference<byte[]> _dataRef;
+        
 
         StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
         {
@@ -1721,27 +2202,19 @@ public class DerbyMessageStore implement
         StoredDerbyMessage(long messageId,
                            StorableMessageMetaData metaData, boolean persist)
         {
-            try
-            {
-                _messageId = messageId;
+            _messageId = messageId;
+            
 
-                _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
-                if(persist)
-                {
-                    _conn = newConnection();
-                    storeMetaData(_conn, messageId, metaData);
-                }
-            }
-            catch (SQLException e)
+            _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
+            if(persist)
             {
-                throw new RuntimeException(e);
+                _metaData = metaData;    
             }
-
         }
 
         public StorableMessageMetaData getMetaData()
         {
-            StorableMessageMetaData metaData = _metaDataRef.get();
+            StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData;
             if(metaData == null)
             {
                 try
@@ -1765,27 +2238,62 @@ public class DerbyMessageStore implement
 
         public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
         {
-            DerbyMessageStore.this.addContent(_conn, _messageId, offsetInMessage, src);
+            src = src.slice();
+
+            if(_data == null)
+            {
+                _data = new byte[src.remaining()];
+                _dataRef = new SoftReference<byte[]>(_data);
+                src.duplicate().get(_data);
+            }
+            else
+            {
+                byte[] oldData = _data;
+                _data = new byte[oldData.length + src.remaining()];
+                _dataRef = new SoftReference<byte[]>(_data);
+
+                System.arraycopy(oldData,0,_data,0,oldData.length);
+                src.duplicate().get(_data, oldData.length, src.remaining());
+            }
+            
         }
 
         public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
         {
-            return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+            byte[] data = _dataRef == null ? null : _dataRef.get();
+            if(data != null)
+            {
+                int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+                dst.put(data, offsetInMessage, length);
+                return length;
+            }
+            else
+            {
+                return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+            }
+        }
+
+
+        public ByteBuffer getContent(int offsetInMessage, int size)
+        {
+            ByteBuffer buf = ByteBuffer.allocate(size);
+            getContent(offsetInMessage, buf);
+            buf.position(0);
+            return  buf;
         }
 
-        public StoreFuture flushToStore()
+        public synchronized StoreFuture flushToStore()
         {
             try
             {
-                if(_conn != null)
+                if(_metaData != null)
                 {
-                    if(_logger.isDebugEnabled())
-                    {
-                        _logger.debug("Flushing message " + _messageId + " to store");
-                    }
+                    Connection conn = newConnection();
+
+                    store(conn);
                     
-                    _conn.commit();
-                    _conn.close();
+                    conn.commit();
+                    conn.close();
                 }
             }
             catch (SQLException e)
@@ -1796,16 +2304,34 @@ public class DerbyMessageStore implement
                 }
                 throw new RuntimeException(e);
             }
-            finally
+            return IMMEDIATE_FUTURE;
+        }
+
+        private synchronized void store(final Connection conn) throws SQLException
+        {
+            if(_metaData != null)
             {
-                _conn = null;
+                try
+                {
+                    storeMetaData(conn, _messageId, _metaData);
+                    DerbyMessageStore.this.addContent(conn, _messageId, 0,
+                                                      _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+                }
+                finally
+                {
+                    _metaData = null;
+                    _data = null;
+                }
+            }
+
+            if(_logger.isDebugEnabled())
+            {
+                _logger.debug("Storing message " + _messageId + " to store");
             }
-            return IMMEDIATE_FUTURE;
         }
 
         public void remove()
         {
-            flushToStore();
             DerbyMessageStore.this.removeMessage(_messageId);
         }
     }
@@ -1839,4 +2365,5 @@ public class DerbyMessageStore implement
             }
         }
     }
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Thu Mar  1 15:42:44 2012
@@ -25,6 +25,8 @@ import org.apache.qpid.AMQStoreException
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.queue.AMQQueue;
 
@@ -128,4 +130,12 @@ public interface DurableConfigurationSto
      * @throws AMQStoreException If the operation fails for any reason.
      */
     void updateQueue(AMQQueue queue) throws AMQStoreException;
+    
+    void createBrokerLink(BrokerLink link) throws AMQStoreException;
+    
+    void deleteBrokerLink(BrokerLink link) throws AMQStoreException;
+    
+    void createBridge(Bridge bridge) throws AMQStoreException;
+    
+    void deleteBridge(Bridge bridge) throws AMQStoreException;
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Thu Mar  1 15:42:44 2012
@@ -31,14 +31,18 @@ import org.apache.qpid.AMQStoreException
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.federation.Bridge;
+import org.apache.qpid.server.federation.BrokerLink;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
 import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 
 /** A simple message store that stores the messages in a threadsafe structure in memory. */
-public class MemoryMessageStore implements MessageStore
+public class MemoryMessageStore implements MessageStore, DurableConfigurationStore
 {
     private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
 
@@ -53,11 +57,11 @@ public class MemoryMessageStore implemen
 
     private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
     {
-        public void enqueueMessage(TransactionLogResource  queue, Long messageId) throws AMQStoreException
+        public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
         {
         }
 
-        public void dequeueMessage(TransactionLogResource  queue, Long messageId) throws AMQStoreException
+        public void dequeueMessage(TransactionLogResource  queue, EnqueableMessage message) throws AMQStoreException
         {
         }
 
@@ -155,6 +159,26 @@ public class MemoryMessageStore implemen
         // Not required to do anything
     }
 
+    public void createBrokerLink(final BrokerLink link) throws AMQStoreException
+    {
+
+    }
+
+    public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
+    {
+
+    }
+
+    public void createBridge(final Bridge bridge) throws AMQStoreException
+    {
+
+    }
+
+    public void deleteBridge(final Bridge bridge) throws AMQStoreException
+    {
+
+    }
+
     public void configureTransactionLog(String name,
                                         TransactionLogRecoveryHandler recoveryHandler,
                                         Configuration storeConfiguration,

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Thu Mar  1 15:42:44 2012
@@ -20,14 +20,16 @@
  */
 package org.apache.qpid.server.store;
 
+import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.message.EnqueableMessage;
 
 /**
  * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
  *
  */
-public interface MessageStore extends DurableConfigurationStore, TransactionLog
+public interface MessageStore
 {
     StoreFuture IMMEDIATE_FUTURE = new StoreFuture()
         {
@@ -77,4 +79,69 @@ public interface MessageStore extends Du
     boolean isPersistent();
 
 
+
+    public static interface Transaction
+    {
+        /**
+         * Places a message onto a specified queue, in a given transactional context.
+         *
+         *
+         *
+         * @param queue     The queue to place the message on.
+         * @param message
+         * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason.
+         */
+        void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException;
+
+        /**
+         * Extracts a message from a specified queue, in a given transactional context.
+         *
+         * @param queue     The queue to place the message on.
+         * @param message The message to dequeue.
+         * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
+         */
+        void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException;
+
+
+        /**
+         * Commits all operations performed within a given transactional context.
+         *
+         * @throws AMQStoreException If the operation fails for any reason.
+         */
+        void commitTran() throws AMQStoreException;
+
+        /**
+         * Commits all operations performed within a given transactional context.
+         *
+         * @throws AMQStoreException If the operation fails for any reason.
+         */
+        StoreFuture commitTranAsync() throws AMQStoreException;
+
+        /**
+         * Abandons all operations performed within a given transactional context.
+         *
+         * @throws AMQStoreException If the operation fails for any reason.
+         */
+        void abortTran() throws AMQStoreException;
+
+
+
+    }
+
+    public void configureTransactionLog(String name,
+                      TransactionLogRecoveryHandler recoveryHandler,
+                      Configuration storeConfiguration,
+                      LogSubject logSubject) throws Exception;
+
+    Transaction newTransaction();
+
+
+
+    public static interface StoreFuture
+    {
+        boolean isComplete();
+
+        void waitForCompletion();
+    }
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java?rev=1295627&r1=1295626&r2=1295627&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java Thu Mar  1 15:42:44 2012
@@ -30,5 +30,7 @@ public interface StorableMessageMetaData
 
     int writeToBuffer(int offsetInMetaData, ByteBuffer dest);
 
+    int getContentSize();
+
     boolean isPersistent();
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org