You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2006/10/30 20:27:56 UTC

svn commit: r469242 [1/2] - in /incubator/qpid/trunk/qpid/cpp: src/qpid/broker/ test/unit/qpid/broker/ test/unit/qpid/concurrent/

Author: gsim
Date: Mon Oct 30 11:27:54 2006
New Revision: 469242

URL: http://svn.apache.org/viewvc?view=rev&rev=469242
Log:
Initial implementation for tx class.


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h   (with props)
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/concurrent/
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/concurrent/APRBaseTest.cpp   (with props)
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Router.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Router.h
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/RouterTest.cpp
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ChannelTest.cpp
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/ExchangeTest.cpp

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp Mon Oct 30 11:27:54 2006
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/AccumulatedAck.h"
+
+using std::less_equal;
+using std::bind2nd;
+using namespace qpid::broker;
+
+void AccumulatedAck::update(u_int64_t tag, bool multiple){
+    if(multiple){
+        if(tag > range) range = tag;
+        //else don't care, it is already counted
+    }else if(tag < range){
+        individual.push_back(tag);
+    }
+}
+
+void AccumulatedAck::consolidate(){
+    individual.sort();
+    //remove any individual tags that are covered by range
+    individual.remove_if(bind2nd(less_equal<u_int64_t>(), range));
+}
+
+void AccumulatedAck::clear(){
+    range = 0;
+    individual.clear();
+}
+
+bool AccumulatedAck::covers(u_int64_t tag) const{
+    return tag < range || find(individual.begin(), individual.end(), tag) != individual.end();
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _AccumulatedAck_
+#define _AccumulatedAck_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+
+namespace qpid {
+    namespace broker {
+        /**
+         * Keeps an accumulated record of acked messages (by delivery
+         * tag).
+         */
+        struct AccumulatedAck{
+            /**
+             * If not zero, then everything up to this value has been
+             * acked.
+             */
+            u_int64_t range;
+            /**
+             * List of individually acked messages that are not
+             * included in the range marked by 'range'.
+             */
+            std::list<u_int64_t> individual;
+
+            void update(u_int64_t tag, bool multiple);
+            void consolidate();
+            void clear();
+            bool covers(u_int64_t tag) const;
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp Mon Oct 30 11:27:54 2006
@@ -21,6 +21,8 @@
 #include <sstream>
 #include <assert.h>
 
+using std::mem_fun_ref;
+using std::bind2nd;
 using namespace qpid::broker;
 using namespace qpid::framing;
 using namespace qpid::concurrent;
@@ -29,14 +31,17 @@
 Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) :
     id(_id), 
     out(_out), 
-    deliveryTag(1),
+    currentDeliveryTag(1),
     transactional(false),
     prefetchSize(0),
     prefetchCount(0),
-    outstandingSize(0),
-    outstandingCount(0),
     framesize(_framesize),
-    tagGenerator("sgen"){}
+    tagGenerator("sgen"),
+    store(0),
+    messageBuilder(this){
+
+    outstanding.reset();
+}
 
 Channel::~Channel(){
 }
@@ -86,30 +91,36 @@
 }
 
 void Channel::commit(){
-
+    TxAck txAck(accumulatedAck, unacked);
+    txBuffer.enlist(&txAck);
+    if(txBuffer.prepare(store)){
+        txBuffer.commit();
+    }
+    accumulatedAck.clear();
 }
 
 void Channel::rollback(){
-
+    txBuffer.rollback();
+    accumulatedAck.clear();
 }
 
 void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
     Locker locker(deliveryLock);
 
-    u_int64_t myDeliveryTag = deliveryTag++;
+    u_int64_t deliveryTag = currentDeliveryTag++;
     if(ackExpected){
-        unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag));
-        outstandingSize += msg->contentSize();
-        outstandingCount++;
+        unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
+        outstanding.size += msg->contentSize();
+        outstanding.count++;
     }
     //send deliver method, header and content(s)
-    msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
+    msg->deliver(out, id, consumerTag, deliveryTag, framesize);
 }
 
 bool Channel::checkPrefetch(Message::shared_ptr& msg){
     Locker locker(deliveryLock);
-    bool countOk = !prefetchCount || prefetchCount > unacknowledged.size();
-    bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstandingSize || unacknowledged.empty();
+    bool countOk = !prefetchCount || prefetchCount > unacked.size();
+    bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
     return countOk && sizeOk;
 }
 
@@ -144,43 +155,66 @@
     if(blocked) queue->dispatch();
 }
 
-void Channel::checkMessage(const std::string& text){
-    if(!message.get()){
-        THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text);
-    }
+void Channel::handlePublish(Message* _message, Exchange* _exchange){
+    Message::shared_ptr message(_message);
+    exchange = _exchange;
+    messageBuilder.initialise(message);
+}
+
+void Channel::handleHeader(AMQHeaderBody::shared_ptr header){
+    messageBuilder.setHeader(header);
+}
+
+void Channel::handleContent(AMQContentBody::shared_ptr content){
+    messageBuilder.addContent(content);
 }
 
-void Channel::handlePublish(Message* msg){
-    if(message.get()){
-        THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
+void Channel::complete(Message::shared_ptr& msg){
+    if(exchange){
+        if(transactional){
+            TxPublish* deliverable = new TxPublish(msg);
+            exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+            txBuffer.enlist(new DeletingTxOp(deliverable));
+        }else{
+            DeliverableMessage deliverable(msg);
+            exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+        }
+        exchange = 0;
+    }else{
+        std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl;
     }
-    message = Message::shared_ptr(msg);
 }
 
-void Channel::ack(u_int64_t _deliveryTag, bool multiple){
-    Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
-    
-    ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(_deliveryTag));
-    if(i == unacknowledged.end()){
-        throw InvalidAckException();
-    }else if(multiple){        
-        unacknowledged.erase(unacknowledged.begin(), ++i);
-        //recompute prefetch outstanding (note: messages delivered through get are ignored)
-        CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch()));
-        outstandingSize = calc.getSize();
-        outstandingCount = calc.getCount();
+void Channel::ack(u_int64_t deliveryTag, bool multiple){
+    if(transactional){
+        accumulatedAck.update(deliveryTag, multiple);
+        //TODO: I think the outstanding prefetch size & count should be updated at this point...
+        //TODO: ...this may then necessitate dispatching to consumers
     }else{
-        if(!i->pull){
-            outstandingSize -= i->msg->contentSize();
-            outstandingCount--;
+        Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
+    
+        ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag));
+        if(i == unacked.end()){
+            throw InvalidAckException();
+        }else if(multiple){     
+            ack_iterator end = ++i;
+            for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard));
+            unacked.erase(unacked.begin(), end);
+
+            //recalculate the prefetch:
+            outstanding.reset();
+            for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding));
+        }else{
+            i->discard();
+            i->subtractFrom(&outstanding);
+            unacked.erase(i);        
         }
-        unacknowledged.erase(i);        
-    }
 
-    //if the prefetch limit had previously been reached, there may
-    //be messages that can be now be delivered
-    for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
-        j->second->requestDispatch();
+        //if the prefetch limit had previously been reached, there may
+        //be messages that can be now be delivered
+        for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
+            j->second->requestDispatch();
+        }
     }
 }
 
@@ -188,14 +222,12 @@
     Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
 
     if(requeue){
-        outstandingSize = 0;
-        outstandingCount = 0;
-        ack_iterator start(unacknowledged.begin());
-        ack_iterator end(unacknowledged.end());
-        for_each(start, end, Requeue());
-        unacknowledged.erase(start, end);
+        outstanding.reset();
+        std::list<DeliveryRecord> copy = unacked;
+        unacked.clear();
+        for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue));
     }else{
-        for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this));        
+        for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));        
     }
 }
 
@@ -203,10 +235,10 @@
     Message::shared_ptr msg = queue->dequeue();
     if(msg){
         Locker locker(deliveryLock);
-        u_int64_t myDeliveryTag = deliveryTag++;
+        u_int64_t myDeliveryTag = currentDeliveryTag++;
         msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
         if(ackExpected){
-            unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag));
+            unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
         }
         return true;
     }else{
@@ -214,43 +246,6 @@
     }
 }
 
-Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {}
-
-bool Channel::MatchAck::operator()(AckRecord& record) const{
-    return tag == record.deliveryTag;
-}
-
-void Channel::Requeue::operator()(AckRecord& record) const{
-    record.msg->redeliver();
-    record.queue->deliver(record.msg);
-}
-
-Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
-
-void Channel::Redeliver::operator()(AckRecord& record) const{
-    if(record.pull){
-        //if message was originally sent as response to get, we must requeue it
-        record.msg->redeliver();
-        record.queue->deliver(record.msg);
-    }else{
-        record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
-    }
-}
-
-Channel::CalculatePrefetch::CalculatePrefetch() : size(0){}
-
-void Channel::CalculatePrefetch::operator()(AckRecord& record){
-    if(!record.pull){
-        //ignore messages that were sent in response to get when calculating prefetch
-        size += record.msg->contentSize();
-        count++;
-    }
-}
-
-u_int32_t Channel::CalculatePrefetch::getSize(){
-    return size;
-}
-
-u_int16_t Channel::CalculatePrefetch::getCount(){
-    return count;
+void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
+    msg->deliver(out, id, consumerTag, deliveryTag, framesize);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.h Mon Oct 30 11:27:54 2006
@@ -19,17 +19,29 @@
 #define _Channel_
 
 #include <algorithm>
+#include <functional>
+#include <list>
 #include <map>
-#include "qpid/framing/AMQContentBody.h"
-#include "qpid/framing/AMQHeaderBody.h"
-#include "qpid/framing/BasicPublishBody.h"
+#include "qpid/broker/AccumulatedAck.h"
 #include "qpid/broker/Binding.h"
 #include "qpid/broker/Consumer.h"
+#include "qpid/broker/DeletingTxOp.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/DeliveryRecord.h"
 #include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/broker/MessageBuilder.h"
 #include "qpid/broker/NameGenerator.h"
-#include "qpid/framing/OutputHandler.h"
+#include "qpid/broker/Prefetch.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/TransactionalStore.h"
+#include "qpid/broker/TxAck.h"
+#include "qpid/broker/TxBuffer.h"
+#include "qpid/broker/TxPublish.h"
+#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/BasicPublishBody.h"
 
 namespace qpid {
     namespace broker {
@@ -37,8 +49,7 @@
          * Maintains state for an AMQP channel. Handles incoming and
          * outgoing messages for that channel.
          */
-        class Channel{
-        private:
+        class Channel : private MessageBuilder::CompletionHandler{
             class ConsumerImpl : public virtual Consumer{
                 Channel* parent;
                 string tag;
@@ -54,92 +65,29 @@
             };
 
             typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; 
-
-            struct AckRecord{
-                Message::shared_ptr msg;
-                Queue::shared_ptr queue;
-                string consumerTag;
-                u_int64_t deliveryTag;
-                bool pull;
-
-                AckRecord(Message::shared_ptr _msg, 
-                          Queue::shared_ptr _queue, 
-                          const string _consumerTag, 
-                          const u_int64_t _deliveryTag) : msg(_msg), 
-                                                          queue(_queue), 
-                                                          consumerTag(_consumerTag),
-                                                          deliveryTag(_deliveryTag),
-                                                          pull(false){}
-
-                AckRecord(Message::shared_ptr _msg, 
-                          Queue::shared_ptr _queue, 
-                          const u_int64_t _deliveryTag) : msg(_msg), 
-                                                          queue(_queue), 
-                                                          consumerTag(""),
-                                                          deliveryTag(_deliveryTag),
-                                                          pull(true){}
-            };
-
-            typedef std::vector<AckRecord>::iterator ack_iterator; 
-
-            class MatchAck{
-                const u_int64_t tag;
-            public:
-                MatchAck(u_int64_t tag);
-                bool operator()(AckRecord& record) const;
-            };
-
-            class Requeue{
-            public:
-                void operator()(AckRecord& record) const;
-            };
-
-            class Redeliver{
-                Channel* const channel;
-            public:
-                Redeliver(Channel* const channel);
-                void operator()(AckRecord& record) const;
-            };
-
-            class CalculatePrefetch{
-                u_int32_t size;
-                u_int16_t count;
-            public:
-                CalculatePrefetch();
-                void operator()(AckRecord& record);
-                u_int32_t getSize();
-                u_int16_t getCount();
-            };
-
             const int id;
             qpid::framing::OutputHandler* out;
-            u_int64_t deliveryTag;
+            u_int64_t currentDeliveryTag;
             Queue::shared_ptr defaultQueue;
             bool transactional;
             std::map<string, ConsumerImpl*> consumers;
             u_int32_t prefetchSize;    
             u_int16_t prefetchCount;    
-            u_int32_t outstandingSize;    
-            u_int16_t outstandingCount;    
+            Prefetch outstanding;
             u_int32_t framesize;
-            Message::shared_ptr message;
             NameGenerator tagGenerator;
-            std::vector<AckRecord> unacknowledged;
+            std::list<DeliveryRecord> unacked;
             qpid::concurrent::MonitorImpl deliveryLock;
+            TxBuffer txBuffer;
+            AccumulatedAck accumulatedAck;
+            TransactionalStore* store;
+            MessageBuilder messageBuilder;//builder for in-progress message
+            Exchange* exchange;//exchange to which any in-progress message was published to
 
+            virtual void complete(Message::shared_ptr& msg);
             void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected);            
-            void checkMessage(const std::string& text);
-            bool checkPrefetch(Message::shared_ptr& msg);
             void cancel(consumer_iterator consumer);
-
-            template<class Operation> Operation processMessage(Operation route){
-                if(message->isComplete()){
-                    route(message);
-                    message.reset();
-                }
-                return route;
-            }
-
+            bool checkPrefetch(Message::shared_ptr& msg);
         
         public:
             Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize);
@@ -158,37 +106,10 @@
             void rollback();
             void ack(u_int64_t deliveryTag, bool multiple);
             void recover(bool requeue);
-
-            /**
-             * Handles the initial publish request though a
-             * channel. The header and (if applicable) content will be
-             * accumulated through calls to handleHeader() and
-             * handleContent()
-             */
-            void handlePublish(Message* msg);
-
-            /**
-             * A template method that handles a received header and if
-             * there is no content routes it using the functor passed
-             * in.
-             */
-            template<class Operation> Operation handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){
-                checkMessage("Invalid message sequence: got header before publish.");
-                message->setHeader(header);
-                return processMessage(route);
-            }
-
-            /**
-             * A template method that handles a received content and
-             * if this completes the message, routes it using the
-             * functor passed in.
-             */
-            template<class Operation> Operation handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){
-                checkMessage("Invalid message sequence: got content before publish.");
-                message->addContent(content);
-                return processMessage(route);
-            }
-
+            void deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag);            
+            void handlePublish(Message* msg, Exchange* exchange);
+            void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+            void handleContent(qpid::framing::AMQContentBody::shared_ptr content);
         };
 
         struct InvalidAckException{};

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Configuration.cpp Mon Oct 30 11:27:54 2006
@@ -191,6 +191,8 @@
     return false;
 }
 
-void Configuration::BoolOption::setValue(const std::string& _value){
-    value = strcasecmp(_value.c_str(), "true") == 0;
+void Configuration::BoolOption::setValue(const std::string& /*not required*/){
+    //BoolOptions have no value. The fact that the option is specified
+    //implies the value is true.
+    value = true;
 }

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp Mon Oct 30 11:27:54 2006
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/DeletingTxOp.h"
+
+using namespace qpid::broker;
+
+DeletingTxOp::DeletingTxOp(TxOp* const _delegate) : delegate(_delegate){}
+
+bool DeletingTxOp::prepare() throw(){
+    return delegate && delegate->prepare();
+}
+
+void DeletingTxOp::commit() throw(){
+    if(delegate){
+        delegate->commit();
+        delete delegate;
+        delegate = 0;
+    }
+}
+
+void DeletingTxOp::rollback() throw(){
+    if(delegate){
+        delegate->rollback();
+        delete delegate;
+        delegate = 0;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _DeletingTxOp_
+#define _DeletingTxOp_
+
+#include "qpid/broker/TxOp.h"
+
+namespace qpid {
+    namespace broker {
+        /**
+         * TxOp wrapper that will delegate calls & delete the object
+         * to which it delegates after completion of the transaction.
+         */
+        class DeletingTxOp : public virtual TxOp{
+            TxOp* delegate;
+        public:
+            DeletingTxOp(TxOp* const delegate);
+            virtual bool prepare() throw();
+            virtual void commit()  throw();
+            virtual void rollback()  throw();
+            virtual ~DeletingTxOp(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeletingTxOp.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Deliverable_
+#define _Deliverable_
+
+#include "qpid/broker/Queue.h"
+
+namespace qpid {
+    namespace broker {
+        class Deliverable{
+        public:
+            virtual void deliverTo(Queue::shared_ptr& queue) = 0;
+            virtual ~Deliverable(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp Mon Oct 30 11:27:54 2006
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/DeliverableMessage.h"
+
+using namespace qpid::broker;
+
+DeliverableMessage::DeliverableMessage(Message::shared_ptr& _msg) : msg(_msg)
+{
+}
+
+void DeliverableMessage::deliverTo(Queue::shared_ptr& queue)
+{
+    queue->deliver(msg);    
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _DeliverableMessage_
+#define _DeliverableMessage_
+
+#include "qpid/broker/Deliverable.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+
+namespace qpid {
+    namespace broker {
+        class DeliverableMessage : public Deliverable{
+            Message::shared_ptr msg;
+        public:
+            DeliverableMessage(Message::shared_ptr& msg);
+            virtual void deliverTo(Queue::shared_ptr& queue);
+            virtual ~DeliverableMessage(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliverableMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Oct 30 11:27:54 2006
@@ -0,0 +1,87 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/DeliveryRecord.h"
+#include "qpid/broker/Channel.h"
+
+using namespace qpid::broker;
+
+DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, 
+                               Queue::shared_ptr _queue, 
+                               const string _consumerTag, 
+                               const u_int64_t _deliveryTag) : msg(_msg), 
+                                                               queue(_queue), 
+                                                               consumerTag(_consumerTag),
+                                                               deliveryTag(_deliveryTag),
+                                                               pull(false){}
+
+DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, 
+                               Queue::shared_ptr _queue, 
+                               const u_int64_t _deliveryTag) : msg(_msg), 
+                                                               queue(_queue), 
+                                                               consumerTag(""),
+                                                               deliveryTag(_deliveryTag),
+                                                               pull(true){}
+
+
+void DeliveryRecord::discard() const{
+    queue->dequeue(msg, 0);
+}
+
+bool DeliveryRecord::matches(u_int64_t tag) const{
+    return deliveryTag == tag;
+}
+
+bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{
+    return range->covers(deliveryTag);
+}
+
+void DeliveryRecord::discardIfCoveredBy(const AccumulatedAck* const range) const{
+    if(coveredBy(range)) discard();
+}
+
+void DeliveryRecord::redeliver(Channel* const channel) const{
+    if(pull){
+        //if message was originally sent as response to get, we must requeue it
+        requeue();
+    }else{
+        channel->deliver(msg, consumerTag, deliveryTag);
+    }
+}
+
+void DeliveryRecord::requeue() const{
+    msg->redeliver();
+    queue->deliver(msg);
+}
+
+void DeliveryRecord::addTo(Prefetch* const prefetch) const{
+    if(!pull){
+        //ignore 'pulled' messages (i.e. those that were sent in
+        //response to get) when calculating prefetch
+        prefetch->size += msg->contentSize();
+        prefetch->count++;
+    }    
+}
+
+void DeliveryRecord::subtractFrom(Prefetch* const prefetch) const{
+    if(!pull){
+        //ignore 'pulled' messages (i.e. those that were sent in
+        //response to get) when calculating prefetch
+        prefetch->size -= msg->contentSize();
+        prefetch->count--;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,61 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _DeliveryRecord_
+#define _DeliveryRecord_
+
+#include <algorithm>
+#include <list>
+#include "qpid/broker/AccumulatedAck.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Prefetch.h"
+#include "qpid/broker/Queue.h"
+
+namespace qpid {
+    namespace broker {
+        class Channel;
+
+        /**
+         * Record of a delivery for which an ack is outstanding.
+         */
+        class DeliveryRecord{
+            mutable Message::shared_ptr msg;
+            mutable Queue::shared_ptr queue;
+            string consumerTag;
+            u_int64_t deliveryTag;
+            bool pull;
+
+        public:
+            DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const string consumerTag, const u_int64_t deliveryTag);
+            DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const u_int64_t deliveryTag);
+            
+            void discard() const;
+            bool matches(u_int64_t tag) const;
+            bool coveredBy(const AccumulatedAck* const range) const;
+            void discardIfCoveredBy(const AccumulatedAck* const range) const;
+            void requeue() const;
+            void redeliver(Channel* const) const;
+            void addTo(Prefetch* const prefetch) const;
+            void subtractFrom(Prefetch* const prefetch) const;
+        };
+
+        typedef std::list<DeliveryRecord>::iterator ack_iterator; 
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Mon Oct 30 11:27:54 2006
@@ -51,12 +51,12 @@
     lock.release();
 }
 
-void DirectExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){
+void DirectExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){
     lock.acquire();
     std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
     int count(0);
     for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
-        (*i)->deliver(msg);
+        msg.deliverTo(*i);
     }
     if(!count){
         std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Mon Oct 30 11:27:54 2006
@@ -41,7 +41,7 @@
 
         virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
 
-        virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
+        virtual void route(Deliverable& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
 
         virtual ~DirectExchange();
     };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Mon Oct 30 11:27:54 2006
@@ -18,9 +18,9 @@
 #ifndef _Exchange_
 #define _Exchange_
 
-#include "qpid/framing/FieldTable.h"
-#include "qpid/broker/Message.h"
+#include "qpid/broker/Deliverable.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/framing/FieldTable.h"
 
 namespace qpid {
 namespace broker {
@@ -32,7 +32,7 @@
         std::string getName() { return name; }
         virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
         virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
-        virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+        virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0;
     };
 }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Mon Oct 30 11:27:54 2006
@@ -44,10 +44,10 @@
     }
 }
 
-void FanOutExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* /*args*/){
+void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* /*args*/){
     Locker locker(lock);
     for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
-        (*i)->deliver(msg);
+        msg.deliverTo(*i);
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Mon Oct 30 11:27:54 2006
@@ -42,7 +42,7 @@
 
     virtual void unbind(Queue::shared_ptr queue, const std::string& routingKey, qpid::framing::FieldTable* args);
 
-    virtual void route(Message::shared_ptr& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
+    virtual void route(Deliverable& msg, const std::string& routingKey, qpid::framing::FieldTable* args);
 
     virtual ~FanOutExchange();
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Mon Oct 30 11:27:54 2006
@@ -58,10 +58,10 @@
 }
 
 
-void HeadersExchange::route(Message::shared_ptr& msg, const string& /*routingKey*/, FieldTable* args){
+void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* args){
     Locker locker(lock);;
     for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
-        if (match(i->first, *args)) i->second->deliver(msg);
+        if (match(i->first, *args)) msg.deliverTo(i->second);
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Mon Oct 30 11:27:54 2006
@@ -45,7 +45,7 @@
 
     virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
 
-    virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+    virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args);
 
     virtual ~HeadersExchange();
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Mon Oct 30 11:27:54 2006
@@ -17,7 +17,6 @@
  */
 #include "qpid/concurrent/MonitorImpl.h"
 #include "qpid/broker/Message.h"
-#include "qpid/broker/ExchangeRegistry.h"
 #include <iostream>
 
 using namespace boost;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Mon Oct 30 11:27:54 2006
@@ -19,16 +19,16 @@
 #define _Message_
 
 #include <boost/shared_ptr.hpp>
+#include "qpid/broker/ConnectionToken.h"
+#include "qpid/broker/TxBuffer.h"
 #include "qpid/framing/AMQContentBody.h"
 #include "qpid/framing/AMQHeaderBody.h"
 #include "qpid/framing/BasicHeaderProperties.h"
 #include "qpid/framing/BasicPublishBody.h"
-#include "qpid/broker/ConnectionToken.h"
 #include "qpid/framing/OutputHandler.h"
 
 namespace qpid {
     namespace broker {
-        class ExchangeRegistry;
  
         /**
          * Represents an AMQP message, i.e. a header body, a list of
@@ -48,6 +48,7 @@
             qpid::framing::AMQHeaderBody::shared_ptr header;
             content_list content;
             u_int64_t size;
+            TxBuffer* tx;
 
             void sendContent(qpid::framing::OutputHandler* out, 
                              int channel, u_int32_t framesize);
@@ -79,8 +80,9 @@
             qpid::framing::BasicHeaderProperties* getHeaderProperties();
             const string& getRoutingKey() const { return routingKey; }
             const string& getExchange() const { return exchange; }
-            u_int64_t contentSize() const{ return size; }
-
+            u_int64_t contentSize() const { return size; }
+            TxBuffer* getTx() const { return tx; }
+            void setTx(TxBuffer* _tx) { tx = _tx; }
         };
     }
 }

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Mon Oct 30 11:27:54 2006
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/MessageBuilder.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+MessageBuilder::MessageBuilder(CompletionHandler* _handler) : handler(_handler) {}
+
+void MessageBuilder::route(){
+    if(message->isComplete()){
+        if(handler) handler->complete(message);
+        message.reset();
+    }
+}
+
+void MessageBuilder::initialise(Message::shared_ptr& msg){
+    if(message.get()){
+        THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
+    }
+    message = msg;
+}
+
+void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){
+    if(!message.get()){
+        THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
+    }
+    message->setHeader(header);
+    route();
+}
+
+void MessageBuilder::addContent(AMQContentBody::shared_ptr& content){
+    if(!message.get()){
+        THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish.");
+    }
+    message->addContent(content);
+    route();
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _MessageBuilder_
+#define _MessageBuilder_
+
+#include "qpid/QpidError.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Message.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/BasicPublishBody.h"
+
+namespace qpid {
+    namespace broker {
+        class MessageBuilder{
+        public:
+            class CompletionHandler{
+            public:
+                virtual void complete(Message::shared_ptr&) = 0;
+                virtual ~CompletionHandler(){}
+            };
+            MessageBuilder(CompletionHandler* _handler);
+            void initialise(Message::shared_ptr& msg);
+            void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header);
+            void addContent(qpid::framing::AMQContentBody::shared_ptr& content);
+        private:
+            Message::shared_ptr message;
+            CompletionHandler* handler;
+
+            void route();
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,71 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _MessageStore_
+#define _MessageStore_
+
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/TransactionalStore.h"
+
+namespace qpid {
+    namespace broker {
+        /**
+         * An abstraction of the persistent storage for messages.
+         */
+        class MessageStore : public TransactionalStore{
+        public:
+            /**
+             * Enqueues a message, storing the message if it has not
+             * been previously stored and recording that the given
+             * message is on the given queue.
+             * 
+             * @param msg the message to enqueue
+             * @param queue the name of the queue onto which it is to be enqueued
+             * @param xid (a pointer to) an identifier of the
+             * distributed transaction in which the operation takes
+             * place or null for 'local' transactions
+             */
+            virtual void enqueue(Message::shared_ptr& msg, const string& queue, const string * const xid) = 0;
+            /**
+             * Dequeues a message, recording that the given message is
+             * no longer on the given queue and deleting the message
+             * if it is no longer on any other queue.
+             * 
+             * @param msg the message to dequeue
+             * @param queue the name of th queue from which it is to be dequeued
+             * @param xid (a pointer to) an identifier of the
+             * distributed transaction in which the operation takes
+             * place or null for 'local' transactions
+             */
+            virtual void dequeue(Message::shared_ptr& msg, const string& queue, const string * const xid) = 0;
+            /**
+             * Treat all enqueue/dequeues where this xid was specified as being committed.
+             */
+            virtual void committed(const string * const xid) = 0;
+            /**
+             * Treat all enqueue/dequeues where this xid was specified as being aborted.
+             */
+            virtual void aborted(const string * const xid) = 0;
+
+            virtual ~MessageStore(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.cpp Mon Oct 30 11:27:54 2006
@@ -0,0 +1,26 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/broker/Prefetch.h"
+
+using namespace qpid::broker;
+
+void Prefetch::reset(){
+    size = 0;
+    count = 0;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Prefetch_
+#define _Prefetch_
+
+#include "qpid/framing/amqp_types.h"
+
+namespace qpid {
+    namespace broker {
+        /**
+         * Count and total size of asynchronously delivered
+         * (i.e. pushed) messages that have acks outstanding.
+         */
+        struct Prefetch{
+            u_int32_t size;
+            u_int16_t count;
+
+            void reset();
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Prefetch.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Oct 30 11:27:54 2006
@@ -16,16 +16,21 @@
  *
  */
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/MessageStore.h"
 #include "qpid/concurrent/MonitorImpl.h"
 #include <iostream>
 
 using namespace qpid::broker;
 using namespace qpid::concurrent;
 
-Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, const ConnectionToken* const _owner) :
+Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, 
+             MessageStore* const _store,
+             const ConnectionToken* const _owner) :
+
     name(_name), 
     autodelete(_autodelete),
     durable(_durable), 
+    store(_store),
     owner(_owner), 
     queueing(false),
     dispatching(false),
@@ -48,6 +53,11 @@
 }
 
 void Queue::deliver(Message::shared_ptr& msg){
+    enqueue(msg, 0);
+    process(msg);
+}
+
+void Queue::process(Message::shared_ptr& msg){
     Locker locker(lock);
     if(queueing || !dispatch(msg)){
         queueing = true;
@@ -152,4 +162,18 @@
 bool Queue::canAutoDelete() const{
     Locker locker(lock);
     return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete);
+}
+
+void Queue::enqueue(Message::shared_ptr& msg, const string * const xid){
+    bool persistent(false);//TODO: pull this from headers
+    if(persistent){
+        store->enqueue(msg, name, xid);
+    }
+}
+
+void Queue::dequeue(Message::shared_ptr& msg, const string * const xid){
+    bool persistent(false);//TODO: pull this from headers
+    if(persistent){
+        store->dequeue(msg, name, xid);
+    }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Oct 30 11:27:54 2006
@@ -31,6 +31,7 @@
 
 namespace qpid {
     namespace broker {
+        class MessageStore;
 
         /**
          * Thrown when exclusive access would be violated.
@@ -47,6 +48,7 @@
             const string name;
             const u_int32_t autodelete;
             const bool durable;
+            MessageStore* const store;
             const ConnectionToken* const owner;
             std::vector<Consumer*> consumers;
             std::queue<Binding*> bindings;
@@ -67,7 +69,9 @@
 
             typedef std::vector<shared_ptr> vector;
 	    
-            Queue(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0);
+            Queue(const string& name, bool durable = false, u_int32_t autodelete = 0, 
+                  MessageStore* const store = 0, 
+                  const ConnectionToken* const owner = 0);
             ~Queue();
             /**
              * Informs the queue of a binding that should be cancelled on
@@ -75,13 +79,16 @@
              */
             void bound(Binding* b);
             /**
-             * Delivers a message to the queue from where it will be
-             * dispatched to immediately to a consumer if one is
-             * available or stored for dequeue or later dispatch if
-             * not.
+             * Delivers a message to the queue. Will record it as
+             * enqueued if persistent then process it.
              */
             void deliver(Message::shared_ptr& msg);
             /**
+             * Dispatches the messages immediately to a consumer if
+             * one is available or stores it for later if not.
+             */
+            void process(Message::shared_ptr& msg);
+            /**
              * Dispatch any queued messages providing there are
              * consumers for them. Only one thread can be dispatching
              * at any time, but this method (rather than the caller)
@@ -98,6 +105,9 @@
             inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
             inline bool hasExclusiveConsumer() const { return exclusive; }
             bool canAutoDelete() const;
+
+            void enqueue(Message::shared_ptr& msg, const string * const xid);
+            void dequeue(Message::shared_ptr& msg, const string * const xid);
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Mon Oct 30 11:27:54 2006
@@ -29,14 +29,15 @@
 QueueRegistry::~QueueRegistry(){}
 
 std::pair<Queue::shared_ptr, bool>
-QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, const ConnectionToken* owner)
+QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, 
+                       MessageStore* const store, const ConnectionToken* owner)
 {
     Locker locker(lock);
     string name = declareName.empty() ? generateName() : declareName;
     assert(!name.empty());
     QueueMap::iterator i =  queues.find(name);
     if (i == queues.end()) {
-	Queue::shared_ptr queue(new Queue(name, durable, autoDelete, owner));
+	Queue::shared_ptr queue(new Queue(name, durable, autoDelete, store, owner));
 	queues[name] = queue;
 	return std::pair<Queue::shared_ptr, bool>(queue, true);
     } else {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Mon Oct 30 11:27:54 2006
@@ -46,7 +46,9 @@
      * @return The queue and a boolean flag which is true if the queue
      * was created by this declare call false if it already existed.
      */
-    std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0);
+    std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, 
+                                               MessageStore* const _store = 0,
+                                               const ConnectionToken* const owner = 0);
 
     /**
      * Destroy the named queue.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.cpp Mon Oct 30 11:27:54 2006
@@ -19,7 +19,6 @@
 #include "qpid/broker/SessionHandlerImpl.h"
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/HeadersExchange.h"
-#include "qpid/broker/Router.h"
 #include "qpid/broker/TopicExchange.h"
 #include "assert.h"
 
@@ -40,11 +39,12 @@
     exchanges(_exchanges),
     cleaner(_cleaner),
     timeout(_timeout),
-    connectionHandler(new ConnectionHandlerImpl(this)),
-    channelHandler(new ChannelHandlerImpl(this)),
     basicHandler(new BasicHandlerImpl(this)),
+    channelHandler(new ChannelHandlerImpl(this)),
+    connectionHandler(new ConnectionHandlerImpl(this)),
     exchangeHandler(new ExchangeHandlerImpl(this)),
     queueHandler(new QueueHandlerImpl(this)),
+    txHandler(new TxHandlerImpl(this)),
     framemax(65536), 
     heartbeat(0) {}
 
@@ -146,11 +146,11 @@
 }
 
 void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
-    getChannel(channel)->handleHeader(body, Router(*exchanges));
+    getChannel(channel)->handleHeader(body);
 }
 
 void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
-    getChannel(channel)->handleContent(body, Router(*exchanges));
+    getChannel(channel)->handleContent(body);
 }
 
 void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
@@ -261,7 +261,8 @@
     if (passive && !name.empty()) {
 	queue = parent->getQueue(name, channel);
     } else {
-	std::pair<Queue::shared_ptr, bool> queue_created =  parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0);
+	std::pair<Queue::shared_ptr, bool> queue_created =  
+            parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, 0, exclusive ? parent : 0);
 	queue = queue_created.first;
 	assert(queue);
 	if (queue_created.second) { // This is a new queue
@@ -367,11 +368,16 @@
 } 
         
 void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t /*ticket*/, 
-                                                   string& exchange, string& routingKey, 
+                                                   string& exchangeName, string& routingKey, 
                                                    bool mandatory, bool immediate){
 
-    Message* msg = new Message(parent, exchange, routingKey, mandatory, immediate);
-    parent->getChannel(channel)->handlePublish(msg);
+    Exchange* exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
+    if(exchange){
+        Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate);
+        parent->getChannel(channel)->handlePublish(msg, exchange);
+    }else{
+        throw ChannelException(404, "Exchange not found '" + exchangeName + "'");
+    }
 } 
         
 void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t /*ticket*/, string& queueName, bool noAck){
@@ -395,4 +401,20 @@
 void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){
     parent->getChannel(channel)->recover(requeue);
 } 
+
+void SessionHandlerImpl::TxHandlerImpl::select(u_int16_t channel){
+    parent->getChannel(channel)->begin();
+    parent->client.getTx().selectOk(channel);
+}
+
+void SessionHandlerImpl::TxHandlerImpl::commit(u_int16_t channel){
+    parent->getChannel(channel)->commit();
+    parent->client.getTx().commitOk(channel);
+}
+
+void SessionHandlerImpl::TxHandlerImpl::rollback(u_int16_t channel){
+    parent->getChannel(channel)->rollback();
+    parent->client.getTx().rollbackOk(channel);
+    parent->getChannel(channel)->recover(false);    
+}
               

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandlerImpl.h Mon Oct 30 11:27:54 2006
@@ -71,11 +71,12 @@
     AutoDelete* const cleaner;
     const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
 
-    std::auto_ptr<ConnectionHandler> connectionHandler;
-    std::auto_ptr<ChannelHandler> channelHandler;
     std::auto_ptr<BasicHandler> basicHandler;
+    std::auto_ptr<ChannelHandler> channelHandler;
+    std::auto_ptr<ConnectionHandler> connectionHandler;
     std::auto_ptr<ExchangeHandler> exchangeHandler;
     std::auto_ptr<QueueHandler> queueHandler;
+    std::auto_ptr<TxHandler> txHandler;
 
     std::map<u_int16_t, Channel*> channels;
     std::vector<Queue::shared_ptr> exclusiveQueues;
@@ -212,18 +213,29 @@
         virtual ~BasicHandlerImpl(){}
     };
 
+    class TxHandlerImpl : public virtual TxHandler{
+        SessionHandlerImpl* parent;
+    public:
+        TxHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+        virtual ~TxHandlerImpl() {}
+        virtual void select(u_int16_t channel);
+        virtual void commit(u_int16_t channel);
+        virtual void rollback(u_int16_t channel);
+    };
+
+
     inline virtual ChannelHandler* getChannelHandler(){ return channelHandler.get(); }
     inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler.get(); }
     inline virtual BasicHandler* getBasicHandler(){ return basicHandler.get(); }
     inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler.get(); }
     inline virtual QueueHandler* getQueueHandler(){ return queueHandler.get(); }
+    inline virtual TxHandler* getTxHandler(){ return txHandler.get(); }       
  
-    inline virtual AccessHandler* getAccessHandler(){ return 0; }       
-    inline virtual FileHandler* getFileHandler(){ return 0; }       
-    inline virtual StreamHandler* getStreamHandler(){ return 0; }       
-    inline virtual TxHandler* getTxHandler(){ return 0; }       
-    inline virtual DtxHandler* getDtxHandler(){ return 0; }       
-    inline virtual TunnelHandler* getTunnelHandler(){ return 0; }       
+    inline virtual AccessHandler* getAccessHandler(){ throw ConnectionException(540, "Access class not implemented"); }       
+    inline virtual FileHandler* getFileHandler(){ throw ConnectionException(540, "File class not implemented"); }       
+    inline virtual StreamHandler* getStreamHandler(){ throw ConnectionException(540, "Stream class not implemented"); }       
+    inline virtual DtxHandler* getDtxHandler(){ throw ConnectionException(540, "Dtx class not implemented"); }       
+    inline virtual TunnelHandler* getTunnelHandler(){ throw ConnectionException(540, "Tunnel class not implemented"); }       
 };
 
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Mon Oct 30 11:27:54 2006
@@ -135,13 +135,13 @@
 }
 
 
-void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* /*args*/){
+void TopicExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){
     lock.acquire();
     for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
         if (i->first.match(routingKey)) {
             Queue::vector& qv(i->second);
             for(Queue::vector::iterator j = qv.begin(); j != qv.end(); j++){
-                (*j)->deliver(msg);
+                msg.deliverTo(*j);
             }
         }
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?view=diff&rev=469242&r1=469241&r2=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Mon Oct 30 11:27:54 2006
@@ -82,7 +82,7 @@
 
     virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
 
-    virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+    virtual void route(Deliverable& msg, const string& routingKey, qpid::framing::FieldTable* args);
 
     virtual ~TopicExchange();
 };

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TransactionalStore_
+#define _TransactionalStore_
+
+namespace qpid {
+    namespace broker {
+        class TransactionalStore{
+        public:
+            virtual void begin() = 0;
+            virtual void commit() = 0;
+            virtual void abort() = 0;
+
+            virtual ~TransactionalStore(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TransactionalStore.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp Mon Oct 30 11:27:54 2006
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/TxAck.h"
+
+using std::bind1st;
+using std::bind2nd;
+using std::mem_fun_ref;
+using namespace qpid::broker;
+
+TxAck::TxAck(AccumulatedAck _acked, std::list<DeliveryRecord>& _unacked) : acked(_acked), unacked(_unacked){
+
+}
+
+bool TxAck::prepare() throw(){
+    try{
+        //dequeue all acked messages from their queues
+        for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::discardIfCoveredBy), &acked));
+        return true;
+    }catch(...){
+        std::cout << "TxAck::prepare() - Failed to prepare" << std::endl;
+        return false;
+    }
+}
+
+void TxAck::commit() throw(){
+    //remove all acked records from the list
+    unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked));
+}
+
+void TxAck::rollback() throw(){
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h Mon Oct 30 11:27:54 2006
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TxAck_
+#define _TxAck_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include "qpid/broker/AccumulatedAck.h"
+#include "qpid/broker/DeliveryRecord.h"
+#include "qpid/broker/TxOp.h"
+
+namespace qpid {
+    namespace broker {
+        class TxAck : public TxOp{
+            AccumulatedAck acked;
+            std::list<DeliveryRecord>& unacked;
+        public:
+            TxAck(AccumulatedAck acked, std::list<DeliveryRecord>& unacked);
+            virtual bool prepare() throw();
+            virtual void commit() throw();
+            virtual void rollback() throw();
+            virtual ~TxAck(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAck.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp?view=auto&rev=469242
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp Mon Oct 30 11:27:54 2006
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/TxBuffer.h"
+
+using namespace qpid::broker;
+
+bool TxBuffer::prepare(TransactionalStore* const store){
+    if(store) store->begin();
+    for(op_iterator i = ops.begin(); i < ops.end(); i++){
+        if(!(*i)->prepare()){
+            if(store) store->abort();
+            return false;
+        }
+    }
+    if(store) store->commit();
+    return true;
+}
+
+void TxBuffer::commit(){
+    for(op_iterator i = ops.begin(); i < ops.end(); i++){
+        (*i)->commit();
+    }
+}
+
+void TxBuffer::rollback(){
+    for(op_iterator i = ops.begin(); i < ops.end(); i++){
+        (*i)->rollback();
+    }
+}
+
+void TxBuffer::enlist(TxOp* const op){
+    ops.push_back(op);
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
------------------------------------------------------------------------------
    svn:eol-style = native