You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/11 17:10:48 UTC

svn commit: r824084 [1/5] - in /qpid/branches/java-broker-0-10/qpid/java: broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/src/main/java/org/apache/q...

Author: rgodfrey
Date: Sun Oct 11 15:10:43 2009
New Revision: 824084

URL: http://svn.apache.org/viewvc?rev=824084&view=rev
Log:
Migrate 0-8 code to use same Txn code as 0-10

Added:
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java   (with props)
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java   (with props)
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java   (with props)
Removed:
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
Modified:
    qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
    qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
    qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java
    qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
    qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
    qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
    qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java

Modified: qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Sun Oct 11 15:10:43 2009
@@ -197,7 +197,7 @@
         return false;
     }
 
-    public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
+    public ArrayList<AMQQueue> route(InboundMessage payload)
     {
         
         Long value = new Long(SizeOf.getUsedMemory());

Modified: qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java Sun Oct 11 15:10:43 2009
@@ -113,7 +113,7 @@
     {
     }
 
-    public ArrayList<AMQQueue> route(InboundMessage message) throws AMQException
+    public ArrayList<AMQQueue> route(InboundMessage message)
     {
         return new ArrayList<AMQQueue>();
     }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Sun Oct 11 15:10:43 2009
@@ -63,7 +63,7 @@
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.AMQQueueMBean;
 import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.ManagementActor;
@@ -78,7 +78,7 @@
     private final QueueRegistry _queueRegistry;
     private final ExchangeRegistry _exchangeRegistry;
     private final ExchangeFactory _exchangeFactory;
-    private final MessageStore _messageStore;
+    private final DurableConfigurationStore _durableConfig;
 
     private final VirtualHost.VirtualHostMBean _virtualHostMBean;
 
@@ -92,7 +92,7 @@
 
         _queueRegistry = virtualHost.getQueueRegistry();
         _exchangeRegistry = virtualHost.getExchangeRegistry();
-        _messageStore = virtualHost.getMessageStore();
+        _durableConfig = virtualHost.getDurableConfigurationStore();
         _exchangeFactory = virtualHost.getExchangeFactory();
     }
 
@@ -277,7 +277,7 @@
                                                        null);
             if (queue.isDurable() && !queue.isAutoDelete())
             {
-                _messageStore.createQueue(queue);
+                _durableConfig.createQueue(queue);
             }
 
             _queueRegistry.registerQueue(queue);
@@ -319,7 +319,7 @@
             queue.delete();
             if (queue.isDurable())
             {
-                _messageStore.removeQueue(queue);
+                _durableConfig.removeQueue(queue);
             }
         }
         catch (AMQException ex)

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Sun Oct 11 15:10:43 2009
@@ -20,24 +20,20 @@
  */
 package org.apache.qpid.server;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.NoRouteException;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -47,14 +43,11 @@
 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
 import org.apache.qpid.server.subscription.RecordDeliveryMethod;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.LocalTransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.*;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.logging.actors.AMQPChannelActor;
@@ -64,7 +57,11 @@
 {
     public static final int DEFAULT_PREFETCH = 5000;
 
-    private static final Logger _log = Logger.getLogger(AMQChannel.class);
+    private static final Logger _logger = Logger.getLogger(AMQChannel.class);
+
+    private static final boolean MSG_AUTH =
+        ApplicationRegistry.getInstance().getConfiguration().getMsgAuth();
+
 
     private final int _channelId;
 
@@ -97,15 +94,13 @@
 
     private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
 
+    // Set of messages being acknoweledged in the current transaction
+    private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>();
+
     private final AtomicBoolean _suspended = new AtomicBoolean(false);
 
-    private TransactionalContext _txnContext;
+    private Transaction _transaction;
 
-    /**
-     * A context used by the message store enabling it to track context for a given channel even across thread
-     * boundaries
-     */
-    private final StoreContext _storeContext;
 
     private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
 
@@ -122,6 +117,10 @@
 
     private LogActor _actor;
     private LogSubject _logSubject;
+    private volatile boolean _rollingBack;
+
+    private static final Runnable NULL_TASK = new Runnable() { public void run() {} };
+    private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
 
     public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
             throws AMQException
@@ -131,22 +130,19 @@
 
         _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
         _logSubject = new ChannelLogSubject(this);
-        
-        _actor.message(ChannelMessages.CHN_1001());
-
-        _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
 
+        _actor.message(ChannelMessages.CHN_1001());
 
         _messageStore = messageStore;
 
         // by default the session is non-transactional
-        _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
+        _transaction = new AutoCommitTransaction(_messageStore);
     }
 
     /** Sets this channel to be part of a local transaction */
     public void setLocalTransactional()
     {
-        _txnContext = new LocalTransactionalContext(this);
+        _transaction = new LocalTransaction(_messageStore);
     }
 
     public boolean isTransactional()
@@ -154,7 +150,7 @@
         // this does not look great but there should only be one "non-transactional"
         // transactional context, while there could be several transactional ones in
         // theory
-        return !(_txnContext instanceof NonTransactionalContext);
+        return !(_transaction instanceof AutoCommitTransaction);
     }
 
     public int getChannelId()
@@ -165,43 +161,61 @@
     public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQException
     {
 
-        _currentMessage = new IncomingMessage(_messageStore.getNewMessageId(), info, _txnContext, _session);
-        _currentMessage.setMessageStore(_messageStore);
+        _currentMessage = new IncomingMessage(_messageStore.getNewMessageId(), info, _session);
         _currentMessage.setExchange(e);
     }
 
     public void publishContentHeader(ContentHeaderBody contentHeaderBody)
             throws AMQException
     {
-        StoreContext.setCurrentContext(_storeContext);
-        try
+        if (_currentMessage == null)
         {
-            if (_currentMessage == null)
+            throw new AMQException("Received content header without previously receiving a BasicPublish frame");
+        }
+        else
+        {
+            if (_logger.isDebugEnabled())
             {
-                throw new AMQException("Received content header without previously receiving a BasicPublish frame");
+                _logger.debug("Content header received on channel " + _channelId);
             }
-            else
-            {
-                if (_log.isDebugEnabled())
-                {
-                    _log.debug("Content header received on channel " + _channelId);
-                }
 
-                _currentMessage.setContentHeaderBody(contentHeaderBody);
+            _currentMessage.setContentHeaderBody(contentHeaderBody);
+
+            _currentMessage.setExpiration();
 
-                _currentMessage.setExpiration();
+            routeCurrentMessage();
 
-                routeCurrentMessage();
+            MessageMetaData mmd = _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
 
-                _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
+            if(_currentMessage.isPersistent())
+            {
+                final Long messageNumber = _currentMessage.getMessageNumber();
+
+                _messageStore.storeMessageMetaData(messageNumber, mmd);
 
-                deliverCurrentMessageIfComplete();
+                _transaction.addPostCommitAction(new Transaction.Action()
+                {
+
+                    public void postCommit()
+                    {
+                    }
 
+                    public void onRollback()
+                    {
+                        try
+                        {
+                            _messageStore.removeMessage(messageNumber);
+                        }
+                        catch (AMQException e)
+                        {
+
+                        }
+                    }
+                });
             }
-        }
-        finally
-        {
-            StoreContext.clearCurrentContext();
+
+            deliverCurrentMessageIfComplete();
+
         }
     }
 
@@ -213,21 +227,36 @@
         {
             try
             {
-                _currentMessage.deliverToQueues();
-            }
-            catch (NoRouteException e)
-            {
-                _returnMessages.add(e);
-            }
-            catch(UnauthorizedAccessException ex)
-            {
-                _returnMessages.add(ex);
+
+                final ArrayList<AMQQueue> destinationQueues = _currentMessage.getDestinationQueues();
+
+                if(!checkMessageUserId(_currentMessage.getContentHeader()))
+                {
+                    _transaction.addPostCommitAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", createAMQMessage(_currentMessage)));
+                }
+                else
+                {
+                    if(destinationQueues == null || _currentMessage.getDestinationQueues().isEmpty())
+                    {
+                        if (_currentMessage.isMandatory() || _currentMessage.isImmediate())
+                        {
+                            _transaction.addPostCommitAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", createAMQMessage(_currentMessage)));
+                        }
+                        else
+                        {
+                            _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage));
+                        }
+
+                    }
+                    else
+                    {
+                        _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues));
+
+                    }
+                }
             }
             finally
             {
-                // callback to allow the context to do any post message processing
-                // primary use is to allow message return processing in the non-tx case
-                _txnContext.messageProcessed(_session);
                 _currentMessage = null;
             }
         }
@@ -236,15 +265,14 @@
 
     public void publishContentBody(ContentBody contentBody) throws AMQException
     {
-        StoreContext.setCurrentContext(_storeContext);
         if (_currentMessage == null)
         {
             throw new AMQException("Received content body without previously receiving a JmsPublishBody");
         }
 
-        if (_log.isDebugEnabled())
+        if (_logger.isDebugEnabled())
         {
-            _log.debug(debugIdentity() + "Content body received on channel " + _channelId);
+            _logger.debug(debugIdentity() + "Content body received on channel " + _channelId);
         }
 
         try
@@ -252,10 +280,19 @@
 
             // returns true iff the message was delivered (i.e. if all data was
             // received
-            _currentMessage.addContentBodyFrame(
-                    _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
-                            contentBody));
+            final ContentChunk contentChunk =
+                    _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody);
+
+            int chunkId = _currentMessage.addContentBodyFrame(contentChunk);
 
+            if(_currentMessage.isPersistent())
+            {
+                final Long messageNumber = _currentMessage.getMessageNumber();
+                _messageStore.storeContentBodyChunk(messageNumber, chunkId,
+                                                    contentChunk, _currentMessage.allContentReceived());
+
+
+            }
 
             deliverCurrentMessageIfComplete();
         }
@@ -266,23 +303,11 @@
             _currentMessage = null;
             throw e;
         }
-        finally
-        {
-            StoreContext.clearCurrentContext();
-        }
     }
 
     protected void routeCurrentMessage() throws AMQException
     {
-        try
-        {
-            _currentMessage.route();            
-        }
-        catch (NoRouteException e)
-        {
-            //_currentMessage.incrementReference();
-            _returnMessages.add(e);
-        }
+        _currentMessage.route();
     }
 
     public long getNextDeliveryTag()
@@ -359,12 +384,12 @@
         Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
         if (sub != null)
         {
-            try 
+            try
             {
                 sub.getSendLock();
                 sub.getQueue().unregisterSubscription(sub);
             }
-            finally 
+            finally
             {
                 sub.releaseSendLock();
             }
@@ -372,7 +397,7 @@
         }
         else
         {
-            _log.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered.");
+            _logger.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered.");
         }
         return false;
     }
@@ -384,15 +409,17 @@
      */
     public void close() throws AMQException
     {
-        _txnContext.rollback();
+
         unsubscribeAllConsumers();
+        _transaction.rollback();
+
         try
         {
             requeue();
         }
         catch (AMQException e)
         {
-            _log.error("Caught AMQException whilst attempting to reque:" + e);        
+            _logger.error("Caught AMQException whilst attempting to reque:" + e);
         }
 
         setClosing(true);
@@ -407,23 +434,23 @@
 
     private void unsubscribeAllConsumers() throws AMQException
     {
-        if (_log.isInfoEnabled())
+        if (_logger.isInfoEnabled())
         {
             if (!_tag2SubscriptionMap.isEmpty())
             {
-                _log.info("Unsubscribing all consumers on channel " + toString());
+                _logger.info("Unsubscribing all consumers on channel " + toString());
             }
             else
             {
-                _log.info("No consumers to unsubscribe on channel " + toString());
+                _logger.info("No consumers to unsubscribe on channel " + toString());
             }
         }
 
         for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet())
         {
-            if (_log.isInfoEnabled())
+            if (_logger.isInfoEnabled())
             {
-                _log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
+                _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
             }
 
             Subscription sub = me.getValue();
@@ -437,7 +464,7 @@
             {
                 sub.releaseSendLock();
             }
-            
+
         }
 
         _tag2SubscriptionMap.clear();
@@ -453,17 +480,17 @@
      */
     public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription)
     {
-        if (_log.isDebugEnabled())
+        if (_logger.isDebugEnabled())
         {
             if (entry.getQueue() == null)
             {
-                _log.debug("Adding unacked message with a null queue:" + entry);
+                _logger.debug("Adding unacked message with a null queue:" + entry);
             }
             else
             {
-                if (_log.isDebugEnabled())
+                if (_logger.isDebugEnabled())
                 {
-                    _log.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
+                    _logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
                                + ") with a queue(" + entry.getQueue() + ") for " + subscription);
                 }
             }
@@ -491,27 +518,13 @@
         // we must create a new map since all the messages will get a new delivery tag when they are redelivered
         Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
 
-        // Deliver these messages out of the transaction as their delivery was never
-        // part of the transaction only the receive.
-        TransactionalContext deliveryContext = null;
-
         if (!messagesToBeDelivered.isEmpty())
         {
-            if (_log.isInfoEnabled())
+            if (_logger.isInfoEnabled())
             {
-                _log.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + toString());
+                _logger.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + toString());
             }
 
-            if (!(_txnContext instanceof NonTransactionalContext))
-            {
-
-                    deliveryContext =
-                            new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
-            }
-            else
-            {
-                deliveryContext = _txnContext;
-            }
         }
 
         for (QueueEntry unacked : messagesToBeDelivered)
@@ -524,13 +537,10 @@
                 // Ensure message is released for redelivery
                 unacked.release();
 
-                // Deliver Message
-                deliveryContext.requeue(unacked);
-
             }
             else
             {
-                unacked.discard(_storeContext);
+                unacked.discard();
             }
         }
 
@@ -555,43 +565,22 @@
             // Ensure message is released for redelivery
             if (!unacked.isQueueDeleted())
             {
-                unacked.release();
-            }
-
 
-            // Deliver these messages out of the transaction as their delivery was never
-            // part of the transaction only the receive.
-            TransactionalContext deliveryContext;
-            if (!(_txnContext instanceof NonTransactionalContext))
-            {
-
-                deliveryContext =
-                            new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
-
-            }
-            else
-            {
-                deliveryContext = _txnContext;
-            }
+                // Ensure message is released for redelivery
+                unacked.release();
 
-            if (!unacked.isQueueDeleted())
-            {
-                // Redeliver the messages to the front of the queue
-                deliveryContext.requeue(unacked);
-                // Deliver increments the message count but we have already deliverted this once so don't increment it again
-                // this was because deliver did an increment changed this.
             }
             else
             {
-                _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
+                _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
                           + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
 
-                unacked.discard(_storeContext);
+                unacked.discard();
             }
         }
         else
         {
-            _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
+            _logger.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
                       + _unacknowledgedMessageMap.size());
 
         }
@@ -612,28 +601,31 @@
         final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
         final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
 
-        if (_log.isDebugEnabled())
+        if (_logger.isDebugEnabled())
         {
-            _log.debug("unacked map Size:" + _unacknowledgedMessageMap.size());
+            _logger.debug("unacked map Size:" + _unacknowledgedMessageMap.size());
         }
 
         // Process the Unacked-Map.
         // Marking messages who still have a consumer for to be resent
         // and those that don't to be requeued.
-        _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, requeue, _storeContext));
+        _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap,
+                                                                    msgToRequeue,
+                                                                    msgToResend,
+                                                                    requeue,
+                                                                    _messageStore));
 
 
         // Process Messages to Resend
-        if (_log.isDebugEnabled())
+        if (_logger.isDebugEnabled())
         {
             if (!msgToResend.isEmpty())
             {
-                _log.debug("Preparing (" + msgToResend.size() + ") message to resend.");
+                _logger.debug("Preparing (" + msgToResend.size() + ") message to resend.");
             }
             else
             {
-                _log.debug("No message to resend.");
+                _logger.debug("No message to resend.");
             }
         }
 
@@ -653,7 +645,7 @@
             // i.e. The channel hasn't been server side suspended.
             // if (isSuspended())
             // {
-            // _log.info("Channel is suspended so requeuing");
+            // _logger.info("Channel is suspended so requeuing");
             // //move this message to requeue
             // msgToRequeue.add(message);
             // }
@@ -669,8 +661,8 @@
 
             if (sub != null)
             {
-                
-                if(!queue.resend(message, sub))
+
+                if(!queue.resend(message,sub))
                 {
                     msgToRequeue.put(deliveryTag, message);
                 }
@@ -678,9 +670,9 @@
             else
             {
 
-                if (_log.isInfoEnabled())
+                if (_logger.isInfoEnabled())
                 {
-                    _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
+                    _logger.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
                               + ")to prevent loss");
                 }
                 // move this message to requeue
@@ -689,91 +681,28 @@
         } // for all messages
         // } else !isSuspend
 
-        if (_log.isInfoEnabled())
+        if (_logger.isInfoEnabled())
         {
             if (!msgToRequeue.isEmpty())
             {
-                _log.info("Preparing (" + msgToRequeue.size() + ") message to requeue to.");
+                _logger.info("Preparing (" + msgToRequeue.size() + ") message to requeue to.");
             }
         }
 
-        // Deliver these messages out of the transaction as their delivery was never
-        // part of the transaction only the receive.
-        TransactionalContext deliveryContext;
-        if (!(_txnContext instanceof NonTransactionalContext))
-        {
-
-            deliveryContext =
-                        new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
-        }
-        else
-        {
-            deliveryContext = _txnContext;
-        }
-
         // Process Messages to Requeue at the front of the queue
         for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet())
         {
             QueueEntry message = entry.getValue();
             long deliveryTag = entry.getKey();
-            
-            message.release();
-            message.setRedelivered(true);
+            _unacknowledgedMessageMap.remove(deliveryTag);
 
-            deliveryContext.requeue(message);
+            message.setRedelivered(true);
+            message.release();
 
-            _unacknowledgedMessageMap.remove(deliveryTag);
         }
     }
 
-    /**
-     * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged messages to
-     * remove the queue reference and also decrement any message reference counts, without actually removing the item
-     * since we may get an ack for a delivery tag that was generated from the deleted queue.
-     *
-     * @param queue the queue that has been deleted
-     *
-     */
- /*   public void queueDeleted(final AMQQueue queue)
-    {
-        try
-        {
-            _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
-            {
-                public boolean callback(UnacknowledgedMessage message)
-                {
-                    if (message.getQueue() == queue)
-                    {
-                        try
-                        {
-                            message.discard(_storeContext);
-                            message.setQueueDeleted(true);
 
-                        }
-                        catch (AMQException e)
-                        {
-                            _log.error(
-                                    "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e);
-                            throw new RuntimeException(e);
-                        }
-                    }
-
-                    return false;
-                }
-
-                public void visitComplete()
-                {
-                }
-            });
-        }
-        catch (AMQException e)
-        {
-            _log.error("Unexpected Error while handling deletion of queue", e);
-            throw new RuntimeException(e);
-        }
-
-    }
-*/
     /**
      * Acknowledge one or more messages.
      *
@@ -785,7 +714,17 @@
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
     {
-        _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
+        Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple);
+        _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
+    }
+
+    private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
+    {
+
+        Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>();
+        _unacknowledgedMessageMap.collect(deliveryTag, multiple, ackedMessageMap);
+        _unacknowledgedMessageMap.remove(ackedMessageMap);
+        return ackedMessageMap.values();
     }
 
     /**
@@ -879,12 +818,69 @@
             throw new AMQException("Fatal error: commit called on non-transactional channel");
         }
 
-        _txnContext.commit();
+        _transaction.commit();
+
     }
 
     public void rollback() throws AMQException
     {
-        _txnContext.rollback();
+        rollback(NULL_TASK);
+    }
+
+    public void rollback(Runnable postRollbackTask) throws AMQException
+    {
+        if (!isTransactional())
+        {
+            throw new AMQException("Fatal error: commit called on non-transactional channel");
+        }
+
+        // stop all subscriptions
+        _rollingBack = true;
+        boolean requiresSuspend = _suspended.compareAndSet(false,true);
+
+        // ensure all subscriptions have seen the change to the channel state
+        for(Subscription sub : _tag2SubscriptionMap.values())
+        {
+            sub.getSendLock();
+            sub.releaseSendLock();
+        }
+
+        try
+        {
+            _transaction.rollback();
+        }
+        finally
+        {
+            _rollingBack = false;
+        }
+
+        postRollbackTask.run();
+
+        for(QueueEntry entry : _resendList)
+        {
+            Subscription sub = entry.getDeliveredSubscription();
+            if(sub == null || sub.isClosed())
+            {
+                entry.release();
+            }
+            else
+            {
+                sub.getQueue().resend(entry, sub);
+            }
+        }
+        _resendList.clear();
+
+        if(requiresSuspend)
+        {
+            _suspended.set(false);
+            for(Subscription sub : _tag2SubscriptionMap.values())
+            {
+                sub.getQueue().deliverAsync(sub);
+            }
+
+        }
+
+
     }
 
     public String toString()
@@ -902,23 +898,22 @@
         return _defaultQueue;
     }
 
-    public StoreContext getStoreContext()
-    {
-        return _storeContext;
-    }
-
     public void processReturns() throws AMQException
     {
         if (!_returnMessages.isEmpty())
         {
-            StoreContext sc =StoreContext.setCurrentContext(_storeContext);
+
             for (RequiredDeliveryException bouncedMessage : _returnMessages)
             {
                 ServerMessage serverMessage = bouncedMessage.getAMQMessage();
                 if(serverMessage instanceof AMQMessage)
                 {
                     AMQMessage message = (AMQMessage) serverMessage;
-                    _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
+                    _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
+                                                                      message.getContentHeaderBody(),
+                                                                      message.getBodyFrameIterator(_session,_channelId),
+                                                                      _channelId,
+                                                                      bouncedMessage.getReplyCode().getCode(),
                                                                      new AMQShortString(bouncedMessage.getMessage()));
 
                 }
@@ -929,18 +924,12 @@
                 }
                 bouncedMessage.release();
             }
-            StoreContext.setCurrentContext(sc);
+
 
             _returnMessages.clear();
         }
     }
 
-
-    public TransactionalContext getTransactionalContext()
-    {
-        return _txnContext;
-    }
-
     public boolean isClosing()
     {
         return _closing;
@@ -962,11 +951,6 @@
         _creditManager.setCreditLimits(prefetchSize, prefetchCount);
     }
 
-    public List<RequiredDeliveryException> getReturnMessages()
-    {
-        return _returnMessages;
-    }
-
     public MessageStore getMessageStore()
     {
         return _messageStore;
@@ -981,7 +965,7 @@
                 ServerMessage msg = entry.getMessage();
                 if(msg instanceof AMQMessage)
                 {
-                    getProtocolSession().getProtocolOutputConverter().writeDeliver((AMQMessage)msg, getChannelId(),
+                    getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
                                                                                    deliveryTag, sub.getConsumerTag());
                 }
                 else
@@ -1011,6 +995,203 @@
         return _recordDeliveryMethod;
     }
 
+
+    private AMQMessage createAMQMessage(IncomingMessage incomingMessage)
+            throws AMQException
+    {
+
+
+
+        final AMQMessageHandle messageHandle = incomingMessage.getMessageHandle();
+        final MessagePublishInfo messagePublishInfo = incomingMessage.getMessagePublishInfo();
+        final ContentHeaderBody header = incomingMessage.getContentHeader();
+
+
+
+        AMQMessage message = new AMQMessage(messageHandle, header, incomingMessage.getSize() ,messagePublishInfo);
+
+        message.setExpiration(incomingMessage.getExpiration());
+        message.setClientIdentifier(_session);
+        return message;
+    }
+
+    private boolean checkMessageUserId(ContentHeaderBody header)
+            throws UnauthorizedAccessException
+    {
+        AMQShortString userID =
+                header.properties instanceof BasicContentHeaderProperties
+                    ? ((BasicContentHeaderProperties) header.properties).getUserId()
+                    : null;
+
+        return (!MSG_AUTH || _session.getPrincipal().getName().equals(userID == null? "" : userID.toString()));
+
+    }
+
+    private class MessageDeliveryAction implements Transaction.Action
+    {
+        private IncomingMessage _incommingMessage;
+        private ArrayList<AMQQueue> _destinationQueues;
+
+        public MessageDeliveryAction(IncomingMessage currentMessage,
+                                     ArrayList<AMQQueue> destinationQueues)
+        {
+            _incommingMessage = currentMessage;
+            _destinationQueues = destinationQueues;
+        }
+
+        public void postCommit()
+        {
+            try
+            {
+                final boolean immediate = _incommingMessage.isImmediate();
+
+                Transaction txn = null;
+
+                for(AMQQueue queue : _destinationQueues)
+                {
+                    QueueEntry entry = queue.enqueue(createAMQMessage(_incommingMessage));
+                    queue.checkCapacity(AMQChannel.this);
+
+
+                    if(immediate && !entry.getDeliveredToConsumer() && entry.acquire())
+                    {
+                          if(txn == null)
+                          {
+                              txn = new LocalTransaction(_messageStore);
+                              Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
+                              entries.add(entry);
+                              txn.enqueue(queue, entry.getMessage(), new MessageAcknowledgeAction(entries));
+                          }
+
+
+                        AMQMessage message = (AMQMessage) entry.getMessage();
+                                        _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
+                                                              message.getContentHeaderBody(),
+                                                              message.getBodyFrameIterator(_session,_channelId),
+                                                              _channelId,
+                                                              AMQConstant.NO_CONSUMERS.getCode(),
+                                                             new AMQShortString("Immediate delivery is not possible."));
+
+
+                    }
+
+                }
+                if(txn != null)
+                {
+                    txn.commit();
+                }
+            }
+            catch (AMQException e)
+            {
+                // TODO
+                throw new RuntimeException(e);
+            }
+
+
+
+
+
+        }
+
+        public void onRollback()
+        {
+            // Maybe keep track of entries that were created and then delete them here in case of failure
+            // to in memory enqueue
+        }
+    }
+
+    private class MessageAcknowledgeAction implements Transaction.Action
+    {
+        private final Collection<QueueEntry> _ackedMessages;
+
+
+        public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages)
+        {
+            _ackedMessages = ackedMessages;
+        }
+
+        public void postCommit()
+        {
+            try
+            {
+                for(QueueEntry entry : _ackedMessages)
+                {
+                    entry.discard();
+                }
+            }
+            finally
+            {
+                _acknowledgedMessages.clear();
+            }
+
+        }
+
+        public void onRollback()
+        {
+            // explicit rollbacks resend the message after the rollback-ok is sent
+            if(_rollingBack)
+            {
+                 _resendList.addAll(_ackedMessages);
+            }
+            else
+            {
+                try
+                {
+                        for(QueueEntry entry : _ackedMessages)
+                        {
+                            entry.release();
+                        }
+                }
+                finally
+                {
+                    _acknowledgedMessages.clear();
+                }
+            }
+
+        }
+    }
+
+    private class WriteReturnAction implements Transaction.Action
+    {
+        private final AMQConstant _errorCode;
+        private final AMQMessage _message;
+        private final String _description;
+
+        public WriteReturnAction(AMQConstant errorCode,
+                                 String description,
+                                 AMQMessage message)
+        {
+            _errorCode = errorCode;
+            _message = message;
+            _description = description;
+        }
+
+        public void postCommit()
+        {
+            try
+            {
+                _session.getProtocolOutputConverter().writeReturn(_message.getMessagePublishInfo(),
+                                                              _message.getContentHeaderBody(),
+                                                              _message.getBodyFrameIterator(_session,_channelId),
+                                                              _channelId,
+                                                              _errorCode.getCode(),
+                                                             new AMQShortString(_description));
+            }
+            catch (AMQException e)
+            {
+                //TODO
+                throw new RuntimeException(e);
+            }
+
+        }
+
+        public void onRollback()
+        {
+            //To change body of implemented methods use File | Settings | File Templates.
+        }
+    }
+
+
     public LogActor getLogActor()
     {
         return _actor;
@@ -1029,7 +1210,7 @@
         }
     }
 
-    public void unblock(AMQQueue queue) 
+    public void unblock(AMQQueue queue)
     {
         if(_blockingQueues.remove(queue))
         {

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java Sun Oct 11 15:10:43 2009
@@ -21,12 +21,11 @@
 package org.apache.qpid.server;
 
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
-import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.AMQException;
 import org.apache.log4j.Logger;
 
@@ -36,23 +35,23 @@
 {
     private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class);
 
-    private Map<Long, QueueEntry> _msgToRequeue;
-    private Map<Long, QueueEntry> _msgToResend;
-    private boolean _requeueIfUnabletoResend;
-    private StoreContext _storeContext;
-    private UnacknowledgedMessageMap _unacknowledgedMessageMap;
+    private final Map<Long, QueueEntry> _msgToRequeue;
+    private final Map<Long, QueueEntry> _msgToResend;
+    private final boolean _requeueIfUnabletoResend;
+    private final UnacknowledgedMessageMap _unacknowledgedMessageMap;
+    private final TransactionLog _transactionLog;
 
-    public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap, 
+    public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap,
                                    Map<Long, QueueEntry> msgToRequeue,
                                    Map<Long, QueueEntry> msgToResend,
                                    boolean requeueIfUnabletoResend,
-                                   StoreContext storeContext)
+                                   TransactionLog txnLog)
     {
         _unacknowledgedMessageMap = unacknowledgedMessageMap;
         _msgToRequeue = msgToRequeue;
         _msgToResend = msgToResend;
         _requeueIfUnabletoResend = requeueIfUnabletoResend;
-        _storeContext = storeContext;
+        _transactionLog = txnLog;
     }
 
     public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
@@ -85,13 +84,14 @@
                 }
                 else
                 {
-                    message.discard(_storeContext);
+
+                    dequeueEntry(message);
                     _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
                 }
             }
             else
             {
-                message.discard(_storeContext);
+                dequeueEntry(message);
                 _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
             }
         }
@@ -100,6 +100,31 @@
         return false;
     }
 
+
+    private void dequeueEntry(final QueueEntry node)
+    {
+        Transaction txn = new AutoCommitTransaction(_transactionLog);
+        dequeueEntry(node, txn);
+    }
+
+    private void dequeueEntry(final QueueEntry node, Transaction txn)
+    {
+        txn.dequeue(node.getQueue(), node.getMessage(),
+                    new Transaction.Action()
+                    {
+
+                        public void postCommit()
+                        {
+                            node.discard();
+                        }
+
+                        public void onRollback()
+                        {
+
+                        }
+                    });
+    }
+
     public void visitComplete()
     {
         _unacknowledgedMessageMap.clear();

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Sun Oct 11 15:10:43 2009
@@ -335,8 +335,7 @@
             String keystorePassword = serverConfig.getKeystorePassword();
             String certType = serverConfig.getCertType();
             SSLContextFactory sslFactory = null;
-            boolean isSsl = false;
-            
+
             if (!serverConfig.getSSLOnly())
             {
                 NetworkDriver driver = new MINANetworkDriver();

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java Sun Oct 11 15:10:43 2009
@@ -82,6 +82,6 @@
 
     public void release()
     {
-        _amqMessage.release();
+        //_amqMessage.release();
     }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java Sun Oct 11 15:10:43 2009
@@ -21,14 +21,12 @@
 package org.apache.qpid.server.ack;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Set;
 import java.util.Map;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.StoreContext;
+
 
 public interface UnacknowledgedMessageMap
 {
@@ -50,18 +48,12 @@
 
     void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs);
 
-    boolean contains(long deliveryTag) throws AMQException;
-
     void remove(Map<Long,QueueEntry> msgs);
 
     QueueEntry remove(long deliveryTag);
 
-    public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException;
-    
     Collection<QueueEntry> cancelAllMessages();
 
-    void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext) throws AMQException;
-
     int size();
 
     void clear();
@@ -75,7 +67,6 @@
      */
     Set<Long> getDeliveryTags();
 
-    public long getUnacknowledgeBytes();
 }
 
 

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Sun Oct 11 15:10:43 2009
@@ -20,20 +20,13 @@
  */
 package org.apache.qpid.server.ack;
 
-import org.apache.qpid.server.store.StoreContext;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.txn.TransactionalContext;
 
 public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
 {
@@ -61,19 +54,15 @@
         }
         else
         {
-            msgs.put(deliveryTag, get(deliveryTag));
+            final QueueEntry entry = get(deliveryTag);
+            if(entry != null)
+            {
+                msgs.put(deliveryTag, entry);
+            }
         }
 
     }
 
-    public boolean contains(long deliveryTag) throws AMQException
-    {
-        synchronized (_lock)
-        {
-            return _map.containsKey(deliveryTag);
-        }
-    }
-
     public void remove(Map<Long,QueueEntry> msgs)
     {
         synchronized (_lock)
@@ -135,15 +124,6 @@
         }
     }
 
-    public void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext)
-            throws AMQException
-    {
-        synchronized (_lock)
-        {
-            txnContext.acknowledgeMessage(deliveryTag, _lastDeliveryTag, multiple, this);
-        }
-    }
-
     public int size()
     {
         synchronized (_lock)
@@ -161,39 +141,6 @@
         }
     }
 
-    public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException
-   
-    {
-        synchronized (_lock)
-        {
-            Iterator<Map.Entry<Long, QueueEntry>> it = _map.entrySet().iterator();
-            while (it.hasNext())
-            {
-                Map.Entry<Long, QueueEntry> unacked = it.next();
-
-                if (unacked.getKey() > deliveryTag)
-                {
-                    //This should not occur now.
-                    throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
-                                           " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
-                }
-
-                //Message has been ack so discard it. This will dequeue and decrement the reference.
-                unacked.getValue().discard(storeContext);
-
-                it.remove();
-
-                _unackedSize -= unacked.getValue().getMessage().getSize();
-
-
-                if (unacked.getKey() == deliveryTag)
-                {
-                    break;
-                }
-            }
-        }
-    }
-    
     public QueueEntry get(long key)
     {
         synchronized (_lock)
@@ -225,8 +172,4 @@
         }
     }
 
-    public long getUnacknowledgeBytes()
-    {
-        return _unackedSize;
-    }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Sun Oct 11 15:10:43 2009
@@ -25,7 +25,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.protocol.ExchangeInitialiser;
 import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
@@ -61,9 +61,9 @@
         return getExchange(new AMQShortString(exchangeName));
     }
 
-    public MessageStore getMessageStore()
+    public DurableConfigurationStore getDurableConfigurationStore()
     {
-        return _host.getMessageStore();
+        return _host.getDurableConfigurationStore();
     }
 
     public void registerExchange(Exchange exchange) throws AMQException
@@ -71,7 +71,7 @@
         _exchangeMap.put(exchange.getName(), exchange);
         if (exchange.isDurable())
         {
-            getMessageStore().createExchange(exchange);
+            getDurableConfigurationStore().createExchange(exchange);
         }
     }
 
@@ -98,7 +98,7 @@
         {
             if (e.isDurable())
             {
-                getMessageStore().removeExchange(e);
+                getDurableConfigurationStore().removeExchange(e);
             }
             e.close();
         }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Sun Oct 11 15:10:43 2009
@@ -209,7 +209,7 @@
         }
     }
 
-    public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
+    public ArrayList<AMQQueue> route(InboundMessage payload)
     {
 
         final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : new AMQShortString(payload.getRoutingKey());

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Sun Oct 11 15:10:43 2009
@@ -56,7 +56,7 @@
 
     void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
 
-    ArrayList<AMQQueue> route(InboundMessage message) throws AMQException;
+    ArrayList<AMQQueue> route(InboundMessage message);
 
 
     /**

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Sun Oct 11 15:10:43 2009
@@ -199,7 +199,7 @@
         }
     }
 
-    public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
+    public ArrayList<AMQQueue> route(InboundMessage payload)
     {
 
     

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Sun Oct 11 15:10:43 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -88,7 +88,7 @@
      * Creates a binding for a set of mappings. Those mappings whose value is
      * null or the empty string are assumed only to be required headers, with
      * no constraint on the value. Those with a non-null value are assumed to
-     * define a required match of value. 
+     * define a required match of value.
      * @param mappings the defined mappings this binding should use
      */
 
@@ -174,14 +174,21 @@
 
     private boolean or(final AMQMessageHeader headers)
     {
-        if(required.isEmpty() || passesRequiredOr(headers))
+        if(required.isEmpty())
         {
-            return ((!matches.isEmpty()) && passesMatchesOr(headers))
-                    || (required.isEmpty() && matches.isEmpty());
+            return  matches.isEmpty() || passesMatchesOr(headers);
         }
         else
         {
-            return true;
+            if(!passesRequiredOr(headers))
+            {
+                return !matches.isEmpty() && passesMatchesOr(headers);
+            }
+            else
+            {
+                return true;
+            }
+
         }
     }
 
@@ -190,7 +197,7 @@
         for(Map.Entry<String,Object> entry : matches.entrySet())
         {
             if(headers.containsHeader(entry.getKey())
-               || ((entry.getValue() == null && headers.getHeader(entry.getKey()) == null)
+               && ((entry.getValue() == null && headers.getHeader(entry.getKey()) == null)
                    || (entry.getValue().equals(headers.getHeader(entry.getKey())))))
             {
                 return true;

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Sun Oct 11 15:10:43 2009
@@ -253,7 +253,7 @@
         }
     }
 
-    public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
+    public ArrayList<AMQQueue> route(InboundMessage payload)
     {
         AMQMessageHeader header = payload.getMessageHeader();
         if (_logger.isDebugEnabled())

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Sun Oct 11 15:10:43 2009
@@ -535,7 +535,7 @@
         return normalizedString;
     }
 
-    public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
+    public ArrayList<AMQQueue> route(InboundMessage payload)
     {
 
         final AMQShortString routingKey = payload.getRoutingKey() == null

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Sun Oct 11 15:10:43 2009
@@ -133,7 +133,7 @@
                 singleMessageCredit.useCreditForMessage(entry.getMessage());
                 if(entry.getMessage() instanceof AMQMessage)
                 {
-                    session.getProtocolOutputConverter().writeGetOk((AMQMessage)(entry.getMessage()), channel.getChannelId(),
+                    session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
                                                                             deliveryTag, queue.getMessageCount());
                 }
                 else

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Sun Oct 11 15:10:43 2009
@@ -81,7 +81,7 @@
                 message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
                 if(message != null)
                 {
-                    message.discard(channel.getStoreContext());
+                    message.discard();
                 }
                 //sendtoDeadLetterQueue(msg)
                 return;

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Sun Oct 11 15:10:43 2009
@@ -41,6 +41,7 @@
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
@@ -60,11 +61,11 @@
 
     public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
+        final AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-        MessageStore store = virtualHost.getMessageStore();
+        DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
 
 
         if (!body.getPassive())
@@ -115,6 +116,25 @@
                         store.createQueue(queue, body.getArguments());
                     }
                     queueRegistry.registerQueue(queue);
+                    if(queue.isExclusive()  && !queue.isAutoDelete())
+                    {
+                        final AMQQueue q = queue;
+                        queue.setExclusiveOwner(session);
+                        final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
+                        {
+                            public void doTask(AMQProtocolSession session) throws AMQException
+                            {
+                                q.setExclusiveOwner(null);
+                            }
+                        };
+                        session.addSessionCloseTask(sessionCloseTask);
+                        queue.addQueueDeleteTask(new AMQQueue.Task() {
+                            public void doTask(AMQQueue queue) throws AMQException
+                            {
+                                session.removeSessionCloseTask(sessionCloseTask);
+                            }
+                        });
+                    }
                     if (autoRegister)
                     {
                         Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
@@ -125,16 +145,17 @@
                 }
             }
             else if (queue.getPrincipalHolder() != null
-                     && queue.getPrincipalHolder().getPrincipal() != null
-                     && queue.getPrincipalHolder().getPrincipal().getName() != null
-                     && !session.getContextKey().equals(new AMQShortString(queue.getPrincipalHolder().getPrincipal().getName())))
+                      && queue.getPrincipalHolder().getPrincipal() != null
+                      && queue.getPrincipalHolder().getPrincipal().getName() != null
+                      && (!queue.getPrincipalHolder().getPrincipal().getName().equals(session.getPrincipal().getName())
+                          || ((!body.getPassive() && queue.getExclusiveOwner() != null && queue.getExclusiveOwner() != session))))
             {
                 throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'),"
                                                                            + " as exclusive queue with same name "
                                                                            + "declared on another client ID('"
                                                                            + queue.getPrincipalHolder().getPrincipal().getName() + "')");
-            }
 
+            }
             AMQChannel channel = session.getChannel(channelId);
 
             if (channel == null)
@@ -201,7 +222,7 @@
                 }
             });
         }// if exclusive and not durable
-        
+
         return queue;
     }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Sun Oct 11 15:10:43 2009
@@ -31,6 +31,7 @@
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.security.access.Permission;
@@ -62,7 +63,7 @@
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-        MessageStore store = virtualHost.getMessageStore();
+        DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
 
         AMQQueue queue;
         if (body.getQueue() == null)

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Sun Oct 11 15:10:43 2009
@@ -33,7 +33,6 @@
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.security.access.Permission;
 
 public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
 {
@@ -106,7 +105,7 @@
                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
                 }
 
-                long purged = queue.clearQueue(channel.getStoreContext());
+                long purged = queue.clearQueue();
 
 
                 if(!body.getNowait())

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Sun Oct 11 15:10:43 2009
@@ -26,7 +26,6 @@
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -62,22 +61,16 @@
             {
                 throw body.getChannelNotFoundException(channelId);
             }
-            StoreContext.setCurrentContext(channel.getStoreContext());
             channel.commit();
 
             MethodRegistry methodRegistry = session.getMethodRegistry();
             AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
             session.writeFrame(responseBody.generateFrame(channelId));
-            
-            channel.processReturns();
+                        
         }
         catch (AMQException e)
         {
             throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
         }
-        finally
-        {
-            StoreContext.clearCurrentContext();
-        }
     }
 }

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Sun Oct 11 15:10:43 2009
@@ -44,9 +44,9 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, TxRollbackBody body, int channelId) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, TxRollbackBody body, final int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
+        final AMQProtocolSession session = stateManager.getProtocolSession();
 
         try
         {
@@ -57,17 +57,22 @@
                 throw body.getChannelNotFoundException(channelId);
             }
 
-            channel.rollback();
 
-            MethodRegistry methodRegistry = session.getMethodRegistry();
-            AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
-            session.writeFrame(responseBody.generateFrame(channelId));
 
+            final MethodRegistry methodRegistry = session.getMethodRegistry();
+            final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
+
+            Runnable task = new Runnable()
+            {
+
+                public void run()
+                {
+                    session.writeFrame(responseBody.generateFrame(channelId));
+                }
+            };
+
+            channel.rollback(task);
             
-            //Now resend all the unacknowledged messages back to the original subscribers.
-            //(Must be done after the TxnRollback-ok response).
-            // Why, are we not allowed to send messages back to client before the ok method?
-            channel.resend(false);
         }
         catch (AMQException e)
         {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org