You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2012/08/10 14:04:32 UTC

svn commit: r1371676 [6/8] - in /qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp_0_10/ cpp/src/qpid/ha/ cpp/src/qpid/management/ cpp/src/qpid/replication/ cpp/src/qpid/store/ cpp/src/qpid/xml/ cpp/src/tests/ tests/src/py/qpid_te...

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Fri Aug 10 12:04:27 2012
@@ -21,6 +21,7 @@
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/DeliverableMessage.h"
 #include "qpid/broker/DeliveryRecord.h"
 #include "qpid/broker/SessionManager.h"
 #include "qpid/broker/SessionHandler.h"
@@ -28,6 +29,7 @@
 #include "qpid/framing/AMQContentBody.h"
 #include "qpid/framing/AMQHeaderBody.h"
 #include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/ServerInvoker.h"
 #include "qpid/log/Statement.h"
@@ -55,9 +57,8 @@ SessionState::SessionState(
     const SessionState::Configuration& config, bool delayManagement)
     : qpid::SessionState(id, config),
       broker(b), handler(&h),
-      semanticState(*this, *this),
+      semanticState(*this),
       adapter(semanticState),
-      msgBuilder(&broker.getStore()),
       mgmtObject(0),
       asyncCommandCompleter(new AsyncCommandCompleter(this))
 {
@@ -208,7 +209,7 @@ void SessionState::handleContent(AMQFram
 {
     if (frame.getBof() && frame.getBos()) //start of frameset
         msgBuilder.start(id);
-    intrusive_ptr<Message> msg(msgBuilder.getMessage());
+    intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg(msgBuilder.getMessage());
     msgBuilder.handle(frame);
     if (frame.getEof() && frame.getEos()) {//end of frameset
         if (frame.getBof()) {
@@ -218,13 +219,16 @@ void SessionState::handleContent(AMQFram
             header.setEof(false);
             msg->getFrames().append(header);
         }
+        DeliverableMessage deliverable(Message(msg, msg), semanticState.getTxBuffer());
         if (broker.isTimestamping())
-            msg->setTimestamp();
-        msg->setPublisher(&getConnection());
+            deliverable.getMessage().setTimestamp();
+        deliverable.getMessage().setPublisher(&getConnection());
+
+
+        IncompleteIngressMsgXfer xfer(this, msg);
         msg->getIngressCompletion().begin();
-        semanticState.handle(msg);
+        semanticState.route(deliverable.getMessage(), deliverable);
         msgBuilder.end();
-        IncompleteIngressMsgXfer xfer(this, msg);
         msg->getIngressCompletion().end(xfer);  // allows msg to complete xfer
     }
 }
@@ -294,18 +298,28 @@ void SessionState::handleOut(AMQFrame& f
     handler->out(frame);
 }
 
-void SessionState::deliver(DeliveryRecord& msg, bool sync)
+DeliveryId SessionState::deliver(const qpid::broker::amqp_0_10::MessageTransfer& message,
+                                 const std::string& destination, bool isRedelivered, uint64_t ttl, uint64_t timestamp,
+                                 qpid::framing::message::AcceptMode acceptMode, qpid::framing::message::AcquireMode acquireMode,
+                                 const qpid::types::Variant::Map& annotations, bool sync)
 {
     uint32_t maxFrameSize = getConnection().getFrameMax();
     assert(senderGetCommandPoint().offset == 0);
     SequenceNumber commandId = senderGetCommandPoint().command;
-    msg.deliver(getProxy().getHandler(), commandId, maxFrameSize);
+
+    framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), destination, acceptMode, acquireMode)));
+    method.setEof(false);
+    getProxy().getHandler().handle(method);
+    message.sendHeader(getProxy().getHandler(), maxFrameSize, isRedelivered, ttl, timestamp, annotations);
+    message.sendContent(getProxy().getHandler(), maxFrameSize);
+
     assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint.
     if (sync) {
         AMQP_ClientProxy::Execution& p(getProxy().getExecution());
         Proxy::ScopedSync s(p);
         p.sync();
     }
+    return commandId;
 }
 
 void SessionState::sendCompletion() {
@@ -349,7 +363,6 @@ void SessionState::addPendingExecutionSy
     }
 }
 
-
 /** factory for creating a reference-counted IncompleteIngressMsgXfer object
  * which will be attached to a message that will be completed asynchronously.
  */
@@ -408,10 +421,10 @@ void SessionState::AsyncCommandCompleter
 
 
 /** Track an ingress message that is pending completion */
-void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<Message> msg)
+void SessionState::AsyncCommandCompleter::addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg)
 {
     qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
-    std::pair<SequenceNumber, boost::intrusive_ptr<Message> > item(msg->getCommandId(), msg);
+    std::pair<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > item(msg->getCommandId(), msg);
     bool unique = pendingMsgs.insert(item).second;
     if (!unique) {
       assert(false);
@@ -430,13 +443,13 @@ void SessionState::AsyncCommandCompleter
 /** done when an execution.sync arrives */
 void SessionState::AsyncCommandCompleter::flushPendingMessages()
 {
-    std::map<SequenceNumber, boost::intrusive_ptr<Message> > copy;
+    std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > copy;
     {
         qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
         pendingMsgs.swap(copy);    // we've only tracked these in case a flush is needed, so nuke 'em now.
     }
     // drop lock, so it is safe to call "flush()"
-    for (std::map<SequenceNumber, boost::intrusive_ptr<Message> >::iterator i = copy.begin();
+    for (std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> >::iterator i = copy.begin();
          i != copy.end(); ++i) {
         i->second->flush();
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Fri Aug 10 12:04:27 2012
@@ -23,17 +23,18 @@
  */
 
 #include "qpid/SessionState.h"
+#include "qpid/framing/enum.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/sys/Time.h"
 #include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/broker/Session.h"
 #include "qpid/broker/SessionAdapter.h"
-#include "qpid/broker/DeliveryAdapter.h"
 #include "qpid/broker/AsyncCompletion.h"
 #include "qpid/broker/MessageBuilder.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/SemanticState.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/sys/Monitor.h"
 
 #include <boost/noncopyable.hpp>
@@ -58,7 +59,6 @@ namespace broker {
 
 class Broker;
 class ConnectionState;
-class Message;
 class SessionHandler;
 class SessionManager;
 
@@ -68,7 +68,6 @@ class SessionManager;
  */
 class SessionState : public qpid::SessionState,
                      public SessionContext,
-                     public DeliveryAdapter,
                      public management::Manageable,
                      public framing::FrameHandler::InOutHandler
 {
@@ -105,8 +104,10 @@ class SessionState : public qpid::Sessio
 
     void sendCompletion();
 
-    //delivery adapter methods:
-    void deliver(DeliveryRecord&, bool sync);
+    DeliveryId deliver(const qpid::broker::amqp_0_10::MessageTransfer& message,
+                       const std::string& destination, bool isRedelivered, uint64_t ttl, uint64_t timestamp,
+                       qpid::framing::message::AcceptMode, qpid::framing::message::AcquireMode,
+                       const qpid::types::Variant::Map& annotations, bool sync);
 
     // Manageable entry points
     management::ManagementObject* GetManagementObject (void) const;
@@ -117,7 +118,7 @@ class SessionState : public qpid::Sessio
 
     // Used by cluster to create replica sessions.
     SemanticState& getSemanticState() { return semanticState; }
-    boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
+    boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> getMessageInProgress() { return msgBuilder.getMessage(); }
     SessionAdapter& getSessionAdapter() { return adapter; }
 
     const SessionId& getSessionId() const { return getId(); }
@@ -199,7 +200,7 @@ class SessionState : public qpid::Sessio
         // If an ingress message does not require a Sync, we need to
         // hold a reference to it in case an Execution.Sync command is received and we
         // have to manually flush the message.
-        std::map<SequenceNumber, boost::intrusive_ptr<Message> > pendingMsgs;
+        std::map<SequenceNumber, boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> > pendingMsgs;
 
         /** complete all pending commands, runs in IO thread */
         void completeCommands();
@@ -212,7 +213,7 @@ class SessionState : public qpid::Sessio
         ~AsyncCommandCompleter() {};
 
         /** track a message pending ingress completion */
-        void addPendingMessage(boost::intrusive_ptr<Message> m);
+        void addPendingMessage(boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> m);
         void deletePendingMessage(SequenceNumber id);
         void flushPendingMessages();
         /** schedule the processing of a completed ingress message.transfer command */
@@ -246,29 +247,29 @@ class SessionState : public qpid::Sessio
     {
      public:
         IncompleteIngressMsgXfer( SessionState *ss,
-                                  boost::intrusive_ptr<Message> m )
+                                  boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> m)
           : AsyncCommandContext(ss, m->getCommandId()),
-          session(ss),
-          msg(m),
-          requiresAccept(m->requiresAccept()),
-          requiresSync(m->getFrames().getMethod()->isSync()),
-          pending(false) {}
+            session(ss),
+            msg(m),
+            requiresAccept(m->requiresAccept()),
+            requiresSync(m->getFrames().getMethod()->isSync()),
+            pending(false) {}
         IncompleteIngressMsgXfer( const IncompleteIngressMsgXfer& x )
           : AsyncCommandContext(x.session, x.msg->getCommandId()),
-          session(x.session),
-          msg(x.msg),
-          requiresAccept(x.requiresAccept),
-          requiresSync(x.requiresSync),
-          pending(x.pending) {}
+            session(x.session),
+            msg(x.msg),
+            requiresAccept(x.requiresAccept),
+            requiresSync(x.requiresSync),
+            pending(x.pending) {}
 
-  virtual ~IncompleteIngressMsgXfer() {};
+        virtual ~IncompleteIngressMsgXfer() {};
 
         virtual void completed(bool);
         virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
 
      private:
         SessionState *session;  // only valid if sync flag in callback is true
-        boost::intrusive_ptr<Message> msg;
+        boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg;
         bool requiresAccept;
         bool requiresSync;
         bool pending;   // true if msg saved on pending list...

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp Fri Aug 10 12:04:27 2012
@@ -20,7 +20,8 @@
  */
 #include "qpid/broker/ThresholdAlerts.h"
 #include "qpid/broker/Queue.h"
-#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/log/Statement.h"
 #include "qpid/management/ManagementAgent.h"
@@ -30,21 +31,20 @@ namespace qpid {
 namespace broker {
 namespace {
 const qmf::org::apache::qpid::broker::EventQueueThresholdExceeded EVENT("dummy", 0, 0);
-bool isQMFv2(const boost::intrusive_ptr<Message> message)
+bool isQMFv2(const Message& message)
 {
-    const qpid::framing::MessageProperties* props = message->getProperties<qpid::framing::MessageProperties>();
+    const qpid::framing::MessageProperties* props = qpid::broker::amqp_0_10::MessageTransfer::get(message).getProperties<qpid::framing::MessageProperties>();
     return props && props->getAppId() == "qmf2";
 }
 
-bool isThresholdEvent(const boost::intrusive_ptr<Message> message)
+bool isThresholdEvent(const Message& message)
 {
-    if (message->getIsManagementMessage()) {
+    if (message.getIsManagementMessage()) {
         //is this a qmf event? if so is it a threshold event?
         if (isQMFv2(message)) {
-            const qpid::framing::FieldTable* headers = message->getApplicationHeaders();
-            if (headers && headers->getAsString("qmf.content") == "_event") {
+            if (message.getPropertyAsString("qmf.content") == "_event") {
                 //decode as list
-                std::string content = message->getFrames().getContent();
+                std::string content = qpid::broker::amqp_0_10::MessageTransfer::get(message).getFrames().getContent();
                 qpid::types::Variant::List list;
                 qpid::amqp_0_10::ListCodec::decode(content, list);
                 if (list.empty() || list.front().getType() != qpid::types::VAR_MAP) return false;
@@ -57,7 +57,7 @@ bool isThresholdEvent(const boost::intru
                 }
             }
         } else {
-            std::string content = message->getFrames().getContent();
+            std::string content = qpid::broker::amqp_0_10::MessageTransfer::get(message).getFrames().getContent();
             qpid::framing::Buffer buffer(const_cast<char*>(content.data()), content.size());
             if (buffer.getOctet() == 'A' && buffer.getOctet() == 'M' && buffer.getOctet() == '2' && buffer.getOctet() == 'e') {
                 buffer.getLong();//sequence
@@ -83,9 +83,9 @@ ThresholdAlerts::ThresholdAlerts(const s
       repeatInterval(repeat ? repeat*qpid::sys::TIME_SEC : 0),
       count(0), size(0), lastAlert(qpid::sys::EPOCH) {}
 
-void ThresholdAlerts::enqueued(const QueuedMessage& m)
+void ThresholdAlerts::enqueued(const Message& m)
 {
-    size += m.payload->contentSize();
+    size += m.getContentSize();
     ++count;
     if ((countThreshold && count >= countThreshold) || (sizeThreshold && size >= sizeThreshold)) {
         if ((repeatInterval == 0 && lastAlert == qpid::sys::EPOCH)
@@ -94,7 +94,7 @@ void ThresholdAlerts::enqueued(const Que
             //enqueued on queues; it may even be that this event
             //causes a message to be enqueued on the queue we are
             //tracking, and so we need to avoid recursing
-            if (isThresholdEvent(m.payload)) return;
+            if (isThresholdEvent(m)) return;
             lastAlert = qpid::sys::now();
             agent.raiseEvent(qmf::org::apache::qpid::broker::EventQueueThresholdExceeded(name, count, size));
             QPID_LOG(info, "Threshold event triggered for " << name << ", count=" << count << ", size=" << size);
@@ -102,9 +102,9 @@ void ThresholdAlerts::enqueued(const Que
     }
 }
 
-void ThresholdAlerts::dequeued(const QueuedMessage& m)
+void ThresholdAlerts::dequeued(const Message& m)
 {
-    size -= m.payload->contentSize();
+    size -= m.getContentSize();
     --count;
     if ((countThreshold && count < countThreshold) || (sizeThreshold && size < sizeThreshold)) {
         lastAlert = qpid::sys::EPOCH;
@@ -127,65 +127,14 @@ void ThresholdAlerts::observe(Queue& que
 }
 
 void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                              const qpid::framing::FieldTable& settings, uint16_t limitRatio)
-
-{
-    qpid::types::Variant::Map map;
-    qpid::amqp_0_10::translate(settings, map);
-    observe(queue, agent, map, limitRatio);
-}
-
-template <class T>
-class Option
+                              const QueueSettings& settings, uint16_t limitRatio)
 {
-  public:
-    Option(const std::string& name, T d) : defaultValue(d) { names.push_back(name); }
-    void addAlias(const std::string& name) { names.push_back(name); }
-    T get(const qpid::types::Variant::Map& settings) const
-    {
-        T value(defaultValue);
-        for (std::vector<std::string>::const_iterator i = names.begin(); i != names.end(); ++i) {
-            if (get(settings, *i, value)) break;
-        }
-        return value;
-    }
-  private:
-    std::vector<std::string> names;
-    T defaultValue;
-
-    bool get(const qpid::types::Variant::Map& settings, const std::string& name, T& value) const
-    {
-        qpid::types::Variant::Map::const_iterator i = settings.find(name);
-        if (i != settings.end()) {
-            try {
-                value = (T) i->second;
-            } catch (const qpid::types::InvalidConversion&) {
-                QPID_LOG(warning, "Bad value for" << name << ": " << i->second);
-            }
-            return true;
-        } else {
-            return false;
-        }
-    }
-};
-
-void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                              const qpid::types::Variant::Map& settings, uint16_t limitRatio)
-
-{
-    //Note: aliases are keys defined by java broker
-    Option<int64_t> repeatInterval("qpid.alert_repeat_gap", 60);
-    repeatInterval.addAlias("x-qpid-minimum-alert-repeat-gap");
-
     //If no explicit threshold settings were given use specified
     //percentage of any limit from the policy.
-    const QueuePolicy* policy = queue.getPolicy();
-    Option<uint32_t> countThreshold("qpid.alert_count", (uint32_t) (policy && limitRatio ? (policy->getMaxCount()*limitRatio/100) : 0));
-    countThreshold.addAlias("x-qpid-maximum-message-count");
-    Option<uint64_t> sizeThreshold("qpid.alert_size", (uint64_t) (policy && limitRatio ? (policy->getMaxSize()*limitRatio/100) : 0));
-    sizeThreshold.addAlias("x-qpid-maximum-message-size");
+    uint32_t countThreshold = settings.alertThreshold.hasCount() ? settings.alertThreshold.getCount() : (settings.maxDepth.getCount()*limitRatio/100);
+    uint32_t sizeThreshold = settings.alertThreshold.hasSize() ? settings.alertThreshold.getSize() : (settings.maxDepth.getSize()*limitRatio/100);
 
-    observe(queue, agent, countThreshold.get(settings), sizeThreshold.get(settings), repeatInterval.get(settings));
+    observe(queue, agent, countThreshold, sizeThreshold, settings.alertRepeatInterval);
 }
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ThresholdAlerts.h Fri Aug 10 12:04:27 2012
@@ -27,15 +27,13 @@
 #include <string>
 
 namespace qpid {
-namespace framing {
-class FieldTable;
-}
 namespace management {
 class ManagementAgent;
 }
 namespace broker {
 
 class Queue;
+struct QueueSettings;
 /**
  * Class to manage generation of QMF alerts when particular thresholds
  * are breached on a queue.
@@ -48,19 +46,17 @@ class ThresholdAlerts : public QueueObse
                     const uint32_t countThreshold,
                     const uint64_t sizeThreshold,
                     const long repeatInterval);
-    void enqueued(const QueuedMessage&);
-    void dequeued(const QueuedMessage&);
-    void acquired(const QueuedMessage&) {};
-    void requeued(const QueuedMessage&) {};
+    void enqueued(const Message&);
+    void dequeued(const Message&);
+    void acquired(const Message&) {};
+    void requeued(const Message&) {};
 
     static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
                         const uint64_t countThreshold,
                         const uint64_t sizeThreshold,
                         const long repeatInterval);
     static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                        const qpid::framing::FieldTable& settings, uint16_t limitRatio);
-    static void observe(Queue& queue, qpid::management::ManagementAgent& agent,
-                        const qpid::types::Variant::Map& settings, uint16_t limitRatio);
+                        const QueueSettings& settings, uint16_t limitRatio);
   private:
     const std::string name;
     qpid::management::ManagementAgent& agent;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h Fri Aug 10 12:04:27 2012
@@ -71,7 +71,6 @@ namespace qpid {
             virtual void commit() throw();
             virtual void rollback() throw();
             virtual ~TxAccept(){}
-            virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
 
             // Used by cluster replication.
             const framing::SequenceSet& getAcked() const { return acked; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp Fri Aug 10 12:04:27 2012
@@ -28,7 +28,7 @@ using namespace qpid::broker;
 
 bool TxBuffer::prepare(TransactionContext* const ctxt)
 {
-    for(op_iterator i = ops.begin(); i < ops.end(); i++){
+    for(op_iterator i = ops.begin(); i != ops.end(); i++){
         if(!(*i)->prepare(ctxt)){
             return false;
         }
@@ -74,7 +74,3 @@ bool TxBuffer::commitLocal(Transactional
     }
     return false;
 }
-
-void TxBuffer::accept(TxOpConstVisitor& v) const {
-    std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v)));
-}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h Fri Aug 10 12:04:27 2012
@@ -108,9 +108,6 @@ namespace qpid {
              * commit
              */
             QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store);
-
-            // Used by cluster to replicate transaction status.
-            void accept(TxOpConstVisitor& v) const;
         };
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h Fri Aug 10 12:04:27 2012
@@ -21,7 +21,6 @@
 #ifndef _TxOp_
 #define _TxOp_
 
-#include "qpid/broker/TxOpVisitor.h"
 #include "qpid/broker/TransactionalStore.h"
 #include <boost/shared_ptr.hpp>
 
@@ -36,8 +35,6 @@ namespace qpid {
             virtual void commit()  throw() = 0;
             virtual void rollback()  throw() = 0;
             virtual ~TxOp(){}
-
-            virtual void accept(TxOpConstVisitor&) const = 0;
         };
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h Fri Aug 10 12:04:27 2012
@@ -1,97 +0,0 @@
-#ifndef QPID_BROKER_TXOPVISITOR_H
-#define QPID_BROKER_TXOPVISITOR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-namespace qpid {
-namespace broker {
-
-class DtxAck;
-class RecoveredDequeue;
-class RecoveredEnqueue;
-class TxAccept;
-class TxPublish;
-
-/**
- * Visitor for TxOp familly of classes.
- */
-struct TxOpConstVisitor
-{
-    virtual ~TxOpConstVisitor() {}
-    virtual void operator()(const DtxAck&) = 0;
-    virtual void operator()(const RecoveredDequeue&) = 0;
-    virtual void operator()(const RecoveredEnqueue&) = 0;
-    virtual void operator()(const TxAccept&) = 0;
-    virtual void operator()(const TxPublish&) = 0;
-};
-
-}} // namespace qpid::broker
-
-#endif  /*!QPID_BROKER_TXOPVISITOR_H*/
-#ifndef QPID_BROKER_TXOPVISITOR_H
-#define QPID_BROKER_TXOPVISITOR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-namespace qpid {
-namespace broker {
-
-class DtxAck;
-class RecoveredDequeue;
-class RecoveredEnqueue;
-class TxAccept;
-class TxPublish;
-
-/**
- * Visitor for TxOp familly of classes.
- */
-struct TxOpConstVisitor
-{
-    virtual ~TxOpConstVisitor() {}
-    virtual void operator()(const DtxAck&) = 0;
-    virtual void operator()(const RecoveredDequeue&) = 0;
-    virtual void operator()(const RecoveredEnqueue&) = 0;
-    virtual void operator()(const TxAccept&) = 0;
-    virtual void operator()(const TxPublish&) = 0;
-};
-
-}} // namespace qpid::broker
-
-#endif  /*!QPID_BROKER_TXOPVISITOR_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp Fri Aug 10 12:04:27 2012
@@ -1,111 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/log/Statement.h"
-#include "qpid/broker/TxPublish.h"
-#include "qpid/broker/Queue.h"
-
-using boost::intrusive_ptr;
-using namespace qpid::broker;
-
-TxPublish::TxPublish(intrusive_ptr<Message> _msg) : msg(_msg) {}
-
-bool TxPublish::prepare(TransactionContext* ctxt) throw()
-{
-    try{
-        while (!queues.empty()) {
-            prepare(ctxt, queues.front());
-            prepared.push_back(queues.front());
-            queues.pop_front();
-        }
-        return true;
-    }catch(const std::exception& e){
-        QPID_LOG(error, "Failed to prepare: " << e.what());
-    }catch(...){
-        QPID_LOG(error, "Failed to prepare (unknown error)");
-    }
-    return false;
-}
-
-void TxPublish::commit() throw()
-{
-    try {
-        for_each(prepared.begin(), prepared.end(), Commit(msg));
-        if (msg->isContentReleaseRequested()) {
-            // NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the
-            // presence of these messages). Do not change these without also checking these tests.
-            if (msg->isContentReleaseBlocked()) {
-                QPID_LOG(debug, "Message id=\"" << msg->getProperties<qpid::framing::MessageProperties>()->getMessageId() << "\"; pid=0x" <<
-                                std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked on commit");
-            } else {
-                msg->releaseContent();
-                QPID_LOG(debug, "Message id=\"" << msg->getProperties<qpid::framing::MessageProperties>()->getMessageId() << "\"; pid=0x" <<
-                                std::hex << msg->getPersistenceId() << std::dec << ": Content released on commit");
-            }
-        }
-    } catch (const std::exception& e) {
-        QPID_LOG(error, "Failed to commit: " << e.what());
-    } catch(...) {
-        QPID_LOG(error, "Failed to commit (unknown error)");
-    }
-}
-
-void TxPublish::rollback() throw()
-{
-    try {
-        for_each(prepared.begin(), prepared.end(), Rollback(msg));
-    } catch (const std::exception& e) {
-        QPID_LOG(error, "Failed to complete rollback: " << e.what());
-    } catch(...) {
-        QPID_LOG(error, "Failed to complete rollback (unknown error)");
-    }
-
-}
-
-void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){
-    if (!queue->isLocal(msg)) {
-        queues.push_back(queue);
-        delivered = true;
-    } else {
-        QPID_LOG(debug, "Won't enqueue local message for " << queue->getName());
-    }
-}
-
-void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue)
-{
-    queue->enqueue(ctxt, msg);
-}
-
-TxPublish::Commit::Commit(intrusive_ptr<Message>& _msg) : msg(_msg){}
-
-void TxPublish::Commit::operator()(const boost::shared_ptr<Queue>& queue){
-    queue->process(msg);
-}
-
-TxPublish::Rollback::Rollback(intrusive_ptr<Message>& _msg) : msg(_msg){}
-
-void TxPublish::Rollback::operator()(const boost::shared_ptr<Queue>& queue){
-    queue->enqueueAborted(msg);
-}
-
-uint64_t TxPublish::contentSize ()
-{
-    return msg->contentSize ();
-}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Fri Aug 10 12:04:27 2012
@@ -1,92 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _TxPublish_
-#define _TxPublish_
-
-#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/Deliverable.h"
-#include "qpid/broker/Message.h"
-#include "qpid/broker/MessageStore.h"
-#include "qpid/broker/TxOp.h"
-
-#include <algorithm>
-#include <functional>
-#include <list>
-
-#include <boost/intrusive_ptr.hpp>
-
-namespace qpid {
-namespace broker {
-/**
- * Defines the behaviour for publish operations on a
- * transactional channel. Messages are routed through
- * exchanges when received but are not at that stage delivered
- * to the matching queues, rather the queues are held in an
- * instance of this class. On prepare() the message is marked
- * enqueued to the relevant queues in the MessagesStore. On
- * commit() the messages will be passed to the queue for
- * dispatch or to be added to the in-memory queue.
- */
-class QPID_BROKER_CLASS_EXTERN TxPublish : public TxOp, public Deliverable{
-
-    class Commit{
-        boost::intrusive_ptr<Message>& msg;
-      public:
-        Commit(boost::intrusive_ptr<Message>& msg);
-        void operator()(const boost::shared_ptr<Queue>& queue);
-    };
-    class Rollback{
-        boost::intrusive_ptr<Message>& msg;
-      public:
-        Rollback(boost::intrusive_ptr<Message>& msg);
-        void operator()(const boost::shared_ptr<Queue>& queue);
-    };
-
-    boost::intrusive_ptr<Message> msg;
-    std::list<boost::shared_ptr<Queue> > queues;
-    std::list<boost::shared_ptr<Queue> > prepared;
-
-    void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
-
-  public:
-    QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);
-    QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw();
-    QPID_BROKER_EXTERN virtual void commit() throw();
-    QPID_BROKER_EXTERN virtual void rollback() throw();
-
-    virtual Message& getMessage() { return *msg; };
-
-    QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
-
-    virtual ~TxPublish(){}
-    virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
-
-    QPID_BROKER_EXTERN uint64_t contentSize();
-
-    boost::intrusive_ptr<Message> getMessage() const { return msg; }
-    const std::list<boost::shared_ptr<Queue> >& getQueues() const { return queues; }
-    const std::list<boost::shared_ptr<Queue> >& getPrepared() const { return prepared; }
-};
-}
-}
-
-
-#endif

Added: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp Fri Aug 10 12:04:27 2012
@@ -0,0 +1,366 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MessageTransfer.h"
+#include "qpid/broker/MapHandler.h"
+#include "qpid/broker/Message.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/frame_functors.h"
+#include "qpid/framing/TypeFilter.h"
+#include "qpid/framing/SendContent.h"
+#include "qpid/log/Statement.h"
+
+using namespace qpid::framing;
+
+namespace qpid {
+namespace broker {
+namespace amqp_0_10 {
+namespace {
+const std::string QMF2("qmf2");
+const std::string PARTIAL("partial");
+}
+MessageTransfer::MessageTransfer() : frames(framing::SequenceNumber()) {}
+MessageTransfer::MessageTransfer(const framing::SequenceNumber& id) : frames(id) {}
+
+uint64_t MessageTransfer::getContentSize() const
+{
+    return frames.getContentSize();
+}
+
+std::string MessageTransfer::getAnnotationAsString(const std::string& key) const
+{
+    const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
+    if (mp && mp->hasApplicationHeaders()) {
+        return mp->getApplicationHeaders().getAsString(key);
+    } else {
+        return std::string();
+    }
+}
+std::string MessageTransfer::getPropertyAsString(const std::string& key) const { return getAnnotationAsString(key); }
+
+bool MessageTransfer::getTtl(uint64_t& result) const
+{
+    const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
+    if (dp && dp->hasTtl()) {
+         result = dp->getTtl();
+         return true;
+    } else {
+        return false;
+    }
+}
+bool MessageTransfer::hasExpiration() const
+{
+    const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
+    if (dp && dp->hasExpiration()) {
+         return true;
+    } else {
+        return false;
+    }
+}
+
+uint8_t MessageTransfer::getPriority() const
+{
+    const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
+    if (dp && dp->hasPriority()) {
+        return dp->getPriority();
+    } else {
+        return 0;
+    }
+}
+
+std::string MessageTransfer::getExchangeName() const
+{
+    return getFrames().as<framing::MessageTransferBody>()->getDestination();
+}
+
+bool MessageTransfer::requiresAccept() const
+{
+    const framing::MessageTransferBody* b = getFrames().as<framing::MessageTransferBody>();
+    return b && b->getAcceptMode() == 0/*EXPLICIT == 0*/;
+}
+uint32_t MessageTransfer::getRequiredCredit() const
+{
+    return requiredCredit;
+}
+void MessageTransfer::computeRequiredCredit()
+{
+    //add up payload for all header and content frames in the frameset
+    qpid::framing::SumBodySize sum;
+    frames.map_if(sum, qpid::framing::TypeFilter2<qpid::framing::HEADER_BODY, qpid::framing::CONTENT_BODY>());
+    requiredCredit = sum.getSize();
+}
+uint32_t MessageTransfer::getRequiredCredit(const qpid::broker::Message& msg)
+{
+    //TODO: may need to reflect annotations and other modifications in this also
+    return get(msg).getRequiredCredit();
+}
+
+qpid::framing::FrameSet& MessageTransfer::getFrames()
+{
+    return frames;
+}
+const qpid::framing::FrameSet& MessageTransfer::getFrames() const
+{
+    return frames;
+}
+void MessageTransfer::sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) const
+{
+    qpid::framing::Count c;
+    frames.map_if(c, qpid::framing::TypeFilter<qpid::framing::CONTENT_BODY>());
+
+    qpid::framing::SendContent f(out, maxFrameSize, c.getCount());
+    frames.map_if(f, qpid::framing::TypeFilter<qpid::framing::CONTENT_BODY>());
+}
+
+class SendHeader
+{
+  public:
+    SendHeader(FrameHandler& h, bool r, uint64_t t, uint64_t ts, const qpid::types::Variant::Map& a) : handler(h), redelivered(r), ttl(t), timestamp(ts), annotations(a) {}
+    void operator()(const AMQFrame& f)
+    {
+        AMQFrame copy = f;
+        if (redelivered || ttl || timestamp || annotations.size()) {
+            copy.cloneBody();
+            if (annotations.size()) {
+                MessageProperties* props =
+                    copy.castBody<AMQHeaderBody>()->get<MessageProperties>(true);
+                for (qpid::types::Variant::Map::const_iterator i = annotations.begin();
+                     i != annotations.end(); ++i) {
+                    props->getApplicationHeaders().setString(i->first, i->second.asString());
+                }
+            }
+            if (redelivered || ttl || timestamp) {
+                DeliveryProperties* dp =
+                    copy.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true);
+                if (ttl) dp->setTtl(ttl);
+                if (redelivered) dp->setRedelivered(redelivered);
+                if (timestamp) dp->setTimestamp(timestamp);
+            }
+        }
+        handler.handle(copy);
+    }
+  private:
+    FrameHandler& handler;
+    bool redelivered;
+    uint64_t ttl;
+    uint64_t timestamp;
+    const qpid::types::Variant::Map& annotations;
+};
+
+void MessageTransfer::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/,
+                                 bool redelivered, uint64_t ttl, uint64_t timestamp,
+                                 const qpid::types::Variant::Map& annotations) const
+{
+    SendHeader f(out, redelivered, ttl, timestamp, annotations);
+    frames.map_if(f, TypeFilter<HEADER_BODY>());
+}
+bool MessageTransfer::isImmediateDeliveryRequired(const qpid::broker::Message& /*message*/)
+{
+    return false;//TODO
+}
+
+const framing::SequenceNumber& MessageTransfer::getCommandId() const { return frames.getId(); }
+
+std::string MessageTransfer::getRoutingKey() const
+{
+    const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
+    if (dp && dp->hasRoutingKey()) {
+        return dp->getRoutingKey();
+    } else {
+        return std::string();
+    }
+}
+bool MessageTransfer::isPersistent() const
+{
+    const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
+    if (dp && dp->hasDeliveryMode()) {
+        return dp->getDeliveryMode() == 2;
+    } else {
+        return false;
+    }
+}
+
+std::string MessageTransfer::getContent() const
+{
+    return frames.getContent();
+}
+
+void MessageTransfer::decodeHeader(framing::Buffer& buffer)
+{
+    AMQFrame method;
+    method.decode(buffer);
+    frames.append(method);
+
+    AMQFrame header;
+    header.decode(buffer);
+    frames.append(header);
+}
+void MessageTransfer::decodeContent(framing::Buffer& buffer)
+{
+    if (buffer.available()) {
+        //get the data as a string and set that as the content
+        //body on a frame then add that frame to the frameset
+        AMQFrame frame((AMQContentBody()));
+        frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
+        frame.setFirstSegment(false);
+        frames.append(frame);
+    } else {
+        //adjust header flags
+        MarkLastSegment f;
+        frames.map_if(f, TypeFilter<HEADER_BODY>());
+    }
+}
+
+void MessageTransfer::encode(framing::Buffer& buffer) const
+{
+    //encode method and header frames
+    EncodeFrame f1(buffer);
+    frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>());
+
+    //then encode the payload of each content frame
+    framing::EncodeBody f2(buffer);
+    frames.map_if(f2, TypeFilter<CONTENT_BODY>());
+}
+
+void MessageTransfer::encodeContent(framing::Buffer& buffer) const
+{
+    //encode the payload of each content frame
+    EncodeBody f2(buffer);
+    frames.map_if(f2, TypeFilter<CONTENT_BODY>());
+}
+
+uint32_t MessageTransfer::encodedSize() const
+{
+    return encodedHeaderSize() + encodedContentSize();
+}
+
+uint32_t MessageTransfer::encodedContentSize() const
+{
+    return  frames.getContentSize();
+}
+
+uint32_t MessageTransfer::encodedHeaderSize() const
+{
+    //add up the size for all method and header frames in the frameset
+    SumFrameSize sum;
+    frames.map_if(sum, TypeFilter2<METHOD_BODY, HEADER_BODY>());
+    return sum.getSize();
+}
+
+bool MessageTransfer::isQMFv2() const
+{
+    const framing::MessageProperties* props = getProperties<framing::MessageProperties>();
+    return props && props->getAppId() == QMF2 && props->hasApplicationHeaders();
+}
+
+bool MessageTransfer::isQMFv2(const qpid::broker::Message& message)
+{
+    const MessageTransfer* transfer = dynamic_cast<const MessageTransfer*>(&message.getEncoding());
+    return transfer && transfer->isQMFv2();
+}
+
+bool MessageTransfer::isLastQMFResponse(const std::string correlation) const
+{
+    const framing::MessageProperties* props = getProperties<framing::MessageProperties>();
+    return props && props->getCorrelationId() == correlation
+        && props->hasApplicationHeaders() && !props->getApplicationHeaders().isSet(PARTIAL);
+}
+
+bool MessageTransfer::isLastQMFResponse(const qpid::broker::Message& message, const std::string correlation)
+{
+    const MessageTransfer* transfer = dynamic_cast<const MessageTransfer*>(&message.getEncoding());
+    return transfer && transfer->isLastQMFResponse(correlation);
+}
+
+
+void MessageTransfer::processProperties(qpid::broker::MapHandler& handler) const
+{
+    const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
+    if (mp && mp->hasApplicationHeaders()) {
+        const FieldTable ft = mp->getApplicationHeaders();
+        for (FieldTable::const_iterator i = ft.begin(); i != ft.end(); ++i) {
+            qpid::broker::MapHandler::CharSequence key;
+            key.data = i->first.data();
+            key.size = i->first.size();
+            FieldTable::ValuePtr v = i->second;
+            //TODO: something more sophisticated... 
+            if (v->empty()) {
+                handler.handleVoid(key);
+            } else if (v->convertsTo<uint64_t>()) {
+                handler.handleUint64(key, v->get<uint64_t>());
+            } else if (v->convertsTo<int64_t>()) {
+                handler.handleInt64(key, v->get<int64_t>());
+            } else if (v->convertsTo<std::string>()) {
+                std::string s = v->get<std::string>();
+                qpid::broker::MapHandler::CharSequence value;
+                value.data = s.data();
+                value.size = s.size();
+                qpid::broker::MapHandler::CharSequence encoding; encoding.size = 0; encoding.data = 0;
+                handler.handleString(key, value, encoding);
+            } else {
+                QPID_LOG(debug, "Unhandled key!" << *v);
+            }
+        }
+    }
+}
+
+std::string MessageTransfer::getUserId() const
+{
+    const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
+    if (mp && mp->hasUserId()) return mp->getUserId();
+    else return std::string();
+
+}
+MessageTransfer::MessageTransfer(const qpid::framing::FrameSet& f) : frames(f), requiredCredit(0) {}
+
+boost::intrusive_ptr<PersistableMessage> MessageTransfer::merge(const std::map<std::string, qpid::types::Variant>& annotations) const
+{
+    boost::intrusive_ptr<MessageTransfer> clone(new MessageTransfer(this->frames));
+    qpid::framing::MessageProperties* mp = clone->frames.getHeaders()->get<qpid::framing::MessageProperties>(true);
+    for (qpid::types::Variant::Map::const_iterator i = annotations.begin(); i != annotations.end(); ++i) {
+        mp->getApplicationHeaders().setString(i->first, i->second);
+    }
+    return clone;
+}
+}
+// qpid::broker namespace, TODO: move these elsewhere!
+void encode(const Message& in, std::string& out)
+{
+    const amqp_0_10::MessageTransfer& transfer = amqp_0_10::MessageTransfer::get(in);
+    uint32_t size = transfer.encodedSize();
+    std::vector<char> data(size);
+    qpid::framing::Buffer buffer(&(data[0]), size);
+    transfer.encode(buffer);
+    buffer.reset();
+    buffer.getRawData(out, size);
+}
+void decode(const std::string& in, Message& out)
+{
+    boost::intrusive_ptr<amqp_0_10::MessageTransfer> transfer(new amqp_0_10::MessageTransfer);
+    qpid::framing::Buffer buffer(const_cast<char*>(in.data()), in.size());
+    transfer->decodeHeader(buffer);
+    transfer->decodeContent(buffer);
+    out = Message(transfer, transfer);
+}
+
+}} // namespace qpid::broker::amqp_0_10

Added: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h?rev=1371676&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h Fri Aug 10 12:04:27 2012
@@ -0,0 +1,132 @@
+#ifndef QPID_BROKER_AMQP_0_10_MESSAGETRANSFER_H
+#define QPID_BROKER_AMQP_0_10_MESSAGETRANSFER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/PersistableMessage.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace broker {
+class Queue;
+namespace amqp_0_10 {
+
+/**
+ *
+ */
+class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::broker::PersistableMessage
+{
+  public:
+    QPID_BROKER_EXTERN MessageTransfer();
+    QPID_BROKER_EXTERN MessageTransfer(const qpid::framing::SequenceNumber&);
+
+    std::string getRoutingKey() const;
+    bool isPersistent() const;
+    uint8_t getPriority() const;
+    uint64_t getContentSize() const;
+    std::string getPropertyAsString(const std::string& key) const;
+    std::string getAnnotationAsString(const std::string& key) const;
+    bool getTtl(uint64_t&) const;
+    bool hasExpiration() const;
+    std::string getExchangeName() const;
+    void processProperties(MapHandler&) const;
+    std::string getUserId() const;
+
+    bool requiresAccept() const;
+    const qpid::framing::SequenceNumber& getCommandId() const;
+    QPID_BROKER_EXTERN qpid::framing::FrameSet& getFrames();
+    QPID_BROKER_EXTERN const qpid::framing::FrameSet& getFrames() const;
+
+    template <class T> const T* getProperties() const {
+        const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+        return p->get<T>();
+    }
+
+    template <class T> const T* hasProperties() const {
+        const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+        return p->get<T>();
+    }
+    template <class T> const T* getMethod() const {
+        return frames.as<T>();
+    }
+
+    template <class T> T* getMethod() {
+        return frames.as<T>();
+    }
+
+    template <class T> bool isA() const {
+        return frames.isA<T>();
+    }
+
+    template <class T> void eraseProperties() {
+        qpid::framing::AMQHeaderBody* p = frames.getHeaders();
+        p->erase<T>();
+    }
+    std::string getContent() const;
+    uint32_t getRequiredCredit() const;
+    void computeRequiredCredit();
+
+    void clearApplicationHeadersFlag();
+    void sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) const;
+    void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize, bool redelivered, uint64_t ttl, uint64_t timestamp, const qpid::types::Variant::Map& annotations) const;
+
+    void decodeHeader(framing::Buffer& buffer);
+    void decodeContent(framing::Buffer& buffer);
+
+    void encode(framing::Buffer& buffer) const;
+    uint32_t encodedSize() const;
+
+    /**
+     * @returns the size of the buffer needed to encode the
+     * 'header' of this message (not just the header frame,
+     * but other meta data e.g.routing key and exchange)
+     */
+    uint32_t encodedHeaderSize() const;
+    boost::intrusive_ptr<PersistableMessage> merge(const std::map<std::string, qpid::types::Variant>& annotations) const;
+
+    QPID_BROKER_EXTERN bool isQMFv2() const;
+    QPID_BROKER_EXTERN bool isLastQMFResponse(const std::string correlation) const;
+
+    static bool isImmediateDeliveryRequired(const qpid::broker::Message& message);
+    static uint32_t getRequiredCredit(const qpid::broker::Message&);
+    static MessageTransfer& get(qpid::broker::Message& message) {
+        return *dynamic_cast<MessageTransfer*>(&message.getEncoding());
+    }
+    static const MessageTransfer& get(const qpid::broker::Message& message) {
+        return *dynamic_cast<const MessageTransfer*>(&message.getEncoding());
+    }
+    QPID_BROKER_EXTERN static bool isQMFv2(const qpid::broker::Message& message);
+    QPID_BROKER_EXTERN static bool isLastQMFResponse(const qpid::broker::Message& message, const std::string correlation);
+  private:
+    qpid::framing::FrameSet frames;
+    uint32_t requiredCredit;
+
+    MessageTransfer(const qpid::framing::FrameSet&);
+    void encodeHeader(framing::Buffer& buffer) const;
+    uint32_t encodedContentSize() const;
+    void encodeContent(framing::Buffer& buffer) const;
+};
+}}} // namespace qpid::broker::amqp_0_10
+
+#endif  /*!QPID_BROKER_AMQP_0_10_MESSAGETRANSFER_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Fri Aug 10 12:04:27 2012
@@ -35,6 +35,7 @@
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/sys/SystemInfo.h"
 #include "qpid/types/Variant.h"
+#include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace ha {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Aug 10 12:04:27 2012
@@ -24,7 +24,9 @@
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueSettings.h"
 #include "qpid/broker/Link.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/log/Statement.h"
 #include "qpid/amqp_0_10/Codecs.h"
@@ -117,11 +119,6 @@ const string _QUERY_REQUEST("_query_requ
 const string BROKER("broker");
 const string MEMBERS("members");
 
-bool isQMFv2(const Message& message) {
-    const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>();
-    return props && props->getAppId() == QMF2;
-}
-
 template <class T> bool match(Variant::Map& schema) {
     return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
 }
@@ -253,18 +250,15 @@ void BrokerReplicator::route(Deliverable
         haBroker.setStatus(CATCHUP);
         QPID_LOG(notice, logPrefix << "Connected to primary " << primary);
     }
-
-    const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
-    const MessageProperties* messageProperties = msg.getMessage().getProperties<MessageProperties>();
     Variant::List list;
     try {
-        if (!isQMFv2(msg.getMessage()) || !headers || !messageProperties)
+        if (!qpid::broker::amqp_0_10::MessageTransfer::isQMFv2(msg.getMessage()))
             throw Exception("Unexpected message, not QMF2 event or query response.");
         // decode as list
-        string content = msg.getMessage().getFrames().getContent();
-        amqp_0_10::ListCodec::decode(content, list);
-        QPID_LOG(trace, "Broker replicator received: " << *messageProperties);
-        if (headers->getAsString(QMF_CONTENT) == EVENT) {
+        string content = msg.getMessage().getContent();
+        qpid::amqp_0_10::ListCodec::decode(content, list);
+
+        if (msg.getMessage().getPropertyAsString(QMF_CONTENT) == EVENT) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
                 Variant::Map& map = i->asMap();
                 QPID_LOG(trace, "Broker replicator event: " << map);
@@ -278,20 +272,20 @@ void BrokerReplicator::route(Deliverable
                 else if (match<EventUnbind>(schema)) doEventUnbind(values);
                 else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values);
             }
-        } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
+        } else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
                 Variant::Map& map = i->asMap();
                 QPID_LOG(trace, "Broker replicator response: " << map);
                 string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString();
                 Variant::Map& values = map[VALUES].asMap();
                 framing::FieldTable args;
-                amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+                qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
                 if      (type == QUEUE) doResponseQueue(values);
                 else if (type == EXCHANGE) doResponseExchange(values);
                 else if (type == BINDING) doResponseBind(values);
                 else if (type == HA_BROKER) doResponseHaBroker(values);
             }
-            if (messageProperties->getCorrelationId() == EXCHANGE && !headers->isSet(PARTIAL)) {
+            if (qpid::broker::amqp_0_10::MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) {
                 // We have received all of the exchange response.
                 alternates.clear();
             }
@@ -309,13 +303,11 @@ void BrokerReplicator::doEventQueueDecla
     Variant::Map argsMap = asMapVoid(values[ARGS]);
     bool autoDel = values[AUTODEL].asBool();
     bool excl = values[EXCL].asBool();
-    if (values[DISP] == CREATED &&
-        replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl))
-    {
+    if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) {
         string name = values[QNAME].asString();
-        QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
+        QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool());
         framing::FieldTable args;
-        amqp_0_10::translate(argsMap, args);
+        qpid::amqp_0_10::translate(argsMap, args);
         // If we already have a queue with this name, replace it.
         // The queue was definitely created on the primary.
         if (broker.getQueues().find(name)) {
@@ -323,10 +315,17 @@ void BrokerReplicator::doEventQueueDecla
             broker.getQueues().destroy(name);
             stopQueueReplicator(name);
         }
-        boost::shared_ptr<Queue> queue = createQueue(
-            name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString());
-        assert(queue);  // Should be created since we destroed the previous queue above.
-        if (queue) startQueueReplicator(queue);
+        settings.populate(args, settings.storeSettings);
+        std::pair<boost::shared_ptr<Queue>, bool> result =
+            broker.createQueue(
+                name,
+                settings,
+                0 /*i.e. no owner regardless of exclusivity on master*/,
+                values[ALTEX].asString(),
+                userId,
+                remoteHost);
+        assert(result.second);  // Should be true since we destroyed existing queue above
+        startQueueReplicator(result.first);
     }
 }
 
@@ -343,7 +342,7 @@ void BrokerReplicator::doEventQueueDelet
     // sessions may be closed by a "queue deleted" exception.
     string name = values[QNAME].asString();
     boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
-    if (queue && replicationTest.replicateLevel(queue->getSettings())) {
+    if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) {
         QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
         stopQueueReplicator(name);
         broker.deleteQueue(name, userId, remoteHost);
@@ -357,7 +356,7 @@ void BrokerReplicator::doEventExchangeDe
         string name = values[EXNAME].asString();
         QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
         framing::FieldTable args;
-        amqp_0_10::translate(argsMap, args);
+        qpid::amqp_0_10::translate(argsMap, args);
         // If we already have a exchange with this name, replace it.
         // The exchange was definitely created on the primary.
         if (broker.getExchanges().find(name)) {
@@ -391,10 +390,10 @@ void BrokerReplicator::doEventBind(Varia
     // We only replicate binds for a replicated queue to replicated
     // exchange that both exist locally.
     if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
-        queue && replicationTest.replicateLevel(queue->getSettings()))
+        queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
     {
         framing::FieldTable args;
-        amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+        qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
         string key = values[KEY].asString();
         QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
                  << " queue=" << queue->getName()
@@ -411,10 +410,10 @@ void BrokerReplicator::doEventUnbind(Var
     // We only replicate unbinds for a replicated queue to replicated
     // exchange that both exist locally.
     if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
-        queue && replicationTest.replicateLevel(queue->getSettings()))
+        queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
     {
         framing::FieldTable args;
-        amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+        qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
         string key = values[KEY].asString();
         QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
                  << " queue=" << queue->getName()
@@ -455,7 +454,7 @@ void BrokerReplicator::doResponseQueue(V
     string name(values[NAME].asString());
     QPID_LOG(debug, logPrefix << "Queue response: " << name);
     framing::FieldTable args;
-    amqp_0_10::translate(argsMap, args);
+    qpid::amqp_0_10::translate(argsMap, args);
     boost::shared_ptr<Queue> queue =
         createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
                     getAltExchange(values[ALTEXCHANGE]));
@@ -470,7 +469,7 @@ void BrokerReplicator::doResponseExchang
     string name = values[NAME].asString();
     QPID_LOG(debug, logPrefix << "Exchange response: " << name);
     framing::FieldTable args;
-    amqp_0_10::translate(argsMap, args);
+    qpid::amqp_0_10::translate(argsMap, args);
     boost::shared_ptr<Exchange> exchange = createExchange(
         name, values[TYPE].asString(), values[DURABLE].asBool(), args,
         getAltExchange(values[ALTEXCHANGE]));
@@ -507,14 +506,14 @@ void BrokerReplicator::doResponseBind(Va
 
     // Automatically replicate binding if queue and exchange exist and are replicated
     if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
-        queue && replicationTest.replicateLevel(queue->getSettings()))
+        queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
     {
         string key = values[KEY].asString();
         QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
                  << " queue:" << qName
                  << " key:" << key);
         framing::FieldTable args;
-        amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+        qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
         exchange->bind(queue, key, &args);
     }
 }
@@ -544,7 +543,7 @@ void BrokerReplicator::doResponseHaBroke
 
 void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue)
 {
-    if (replicationTest.replicateLevel(queue->getSettings()) == ALL) {
+    if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) {
         boost::shared_ptr<QueueReplicator> qr(
             new QueueReplicator(haBroker, queue, link));
         if (!broker.getExchanges().registerExchange(qr))
@@ -570,14 +569,14 @@ boost::shared_ptr<Queue> BrokerReplicato
     const qpid::framing::FieldTable& arguments,
     const std::string& alternateExchange)
 {
+    QueueSettings settings(durable, autodelete);
+    settings.populate(arguments, settings.storeSettings);
     std::pair<boost::shared_ptr<Queue>, bool> result =
         broker.createQueue(
             name,
-            durable,
-            autodelete,
-            0, // no owner regardless of exclusivity on primary
+            settings,
+            0,// no owner regardless of exclusivity on primary
             string(), // Set alternate exchange below
-            arguments,
             userId,
             remoteHost);
     if (result.second) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Fri Aug 10 12:04:27 2012
@@ -180,7 +180,7 @@ void Primary::readyReplica(const Replica
 
 void Primary::queueCreate(const QueuePtr& q) {
     // Throw if there is an invalid replication level in the queue settings.
-    haBroker.getReplicationTest().replicateLevel(q->getSettings());
+    haBroker.getReplicationTest().replicateLevel(q->getSettings().storeSettings);
     Mutex::ScopedLock l(lock);
     for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
         i->second->queueCreate(q);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Fri Aug 10 12:04:27 2012
@@ -39,10 +39,10 @@ class QueueGuard::QueueObserver : public
 {
   public:
     QueueObserver(QueueGuard& g) : guard(g) {}
-    void enqueued(const broker::QueuedMessage& qm) { guard.enqueued(qm); }
-    void dequeued(const broker::QueuedMessage& qm) { guard.dequeued(qm); }
-    void acquired(const broker::QueuedMessage&) {}
-    void requeued(const broker::QueuedMessage&) {}
+    void enqueued(const broker::Message& m) { guard.enqueued(m); }
+    void dequeued(const broker::Message& m) { guard.dequeued(m); }
+    void acquired(const broker::Message&) {}
+    void requeued(const broker::Message&) {}
   private:
     QueueGuard& guard;
 };
@@ -64,39 +64,47 @@ QueueGuard::QueueGuard(broker::Queue& q,
 QueueGuard::~QueueGuard() { cancel(); }
 
 // NOTE: Called with message lock held.
-void QueueGuard::enqueued(const QueuedMessage& qm) {
-    assert(qm.queue == &queue);
+void QueueGuard::enqueued(const Message& m) {
     // Delay completion
-    QPID_LOG(trace, logPrefix << "Delayed completion of " << qm);
-    qm.payload->getIngressCompletion().startCompleter();
+    QPID_LOG(trace, logPrefix << "Delayed completion of " << m);
+    m.getIngressCompletion()->startCompleter();
     {
         Mutex::ScopedLock l(lock);
-        assert(!delayed.contains(qm.position));
-        delayed += qm.position;
+        if (!delayed.insert(Delayed::value_type(m.getSequence(), m.getIngressCompletion())).second) {
+            QPID_LOG(critical, logPrefix << "Second enqueue for message with sequence " << m.getSequence());
+            assert(false);
+        }
     }
 }
 
 // NOTE: Called with message lock held.
-void QueueGuard::dequeued(const QueuedMessage& qm) {
-    assert(qm.queue == &queue);
-    QPID_LOG(trace, logPrefix << "Dequeued " << qm);
+void QueueGuard::dequeued(const Message& m) {
+    QPID_LOG(trace, logPrefix << "Dequeued " << m);
     ReplicatingSubscription* rs=0;
     {
         Mutex::ScopedLock l(lock);
         rs = subscription;
     }
-    if (rs) rs->dequeued(qm);
-    complete(qm);
+    if (rs) rs->dequeued(m);
+    complete(m.getSequence());
+}
+
+void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) {
+    for (Delayed::iterator i = begin; i != end; ++i) {
+        QPID_LOG(trace, logPrefix << "Completed " << i->first);
+        i->second->finishCompleter();
+    }
 }
 
 void QueueGuard::cancel() {
     queue.removeObserver(observer);
+    Delayed removed;
     {
         Mutex::ScopedLock l(lock);
         if (delayed.empty()) return; // No need if no delayed messages.
+        delayed.swap(removed);
     }
-    // FIXME aconway 2012-06-15: optimize, only messages in delayed set.
-    queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
+    completeRange(removed.begin(), removed.end());
 }
 
 void QueueGuard::attach(ReplicatingSubscription& rs) {
@@ -104,36 +112,39 @@ void QueueGuard::attach(ReplicatingSubsc
     subscription = &rs;
 }
 
-namespace {
-void completeBefore(QueueGuard* guard, SequenceNumber position, const QueuedMessage& qm) {
-    if (qm.position <= position) guard->complete(qm);
-}
-}
-
 bool QueueGuard::subscriptionStart(SequenceNumber position) {
-   // Complete any messages before or at the ReplicatingSubscription start position.
-   // Those messages are already on the backup.
-    if (!delayed.empty() && delayed.front() <= position) {
-        // FIXME aconway 2012-06-15: queue iteration, only messages in delayed
-        queue.eachMessage(boost::bind(&completeBefore, this, position, _1));
+    Delayed removed;
+    {
+        Mutex::ScopedLock l(lock);
+        // Complete any messages before or at the ReplicatingSubscription start position.
+        // Those messages are already on the backup.
+        for (Delayed::iterator i = delayed.begin(); i != delayed.end() && i->first <= position;) {
+            removed.insert(*i);
+            delayed.erase(i++);
+        }
     }
+    completeRange(removed.begin(), removed.end());
     return position >= range.back;
 }
 
-void QueueGuard::complete(const QueuedMessage& qm) {
-    assert(qm.queue == &queue);
+void QueueGuard::complete(SequenceNumber sequence) {
+    boost::intrusive_ptr<broker::AsyncCompletion> m;
     {
         Mutex::ScopedLock l(lock);
         // The same message can be completed twice, by
         // ReplicatingSubscription::acknowledged and dequeued. Remove it
-        // from the set so we only call finishCompleter() once
-        if (delayed.contains(qm.position))
-            delayed -= qm.position;
-        else
-            return;
+        // from the map so we only call finishCompleter() once
+        Delayed::iterator i = delayed.find(sequence);
+        if (i != delayed.end()) {
+            m = i->second;
+            delayed.erase(i);
+        }
+
+    }
+    if (m) {
+        QPID_LOG(trace, logPrefix << "Completed " << sequence);
+        m->finishCompleter();
     }
-    QPID_LOG(trace, logPrefix << "Completed " << qm);
-    qm.payload->getIngressCompletion().finishCompleter();
 }
 
 }} // namespaces qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h Fri Aug 10 12:04:27 2012
@@ -63,15 +63,15 @@ class QueueGuard {
     /** QueueObserver override. Delay completion of the message.
      * NOTE: Called under the queues message lock.
      */
-    void enqueued(const broker::QueuedMessage&);
+    void enqueued(const broker::Message&);
 
     /** QueueObserver override: Complete a delayed message.
      * NOTE: Called under the queues message lock.
      */
-    void dequeued(const broker::QueuedMessage&);
+    void dequeued(const broker::Message&);
 
     /** Complete a delayed message. */
-    void complete(const broker::QueuedMessage&);
+    void complete(framing::SequenceNumber);
 
     /** Complete all delayed messages. */
     void cancel();
@@ -108,10 +108,13 @@ class QueueGuard {
     sys::Mutex lock;
     std::string logPrefix;
     broker::Queue& queue;
-    framing::SequenceSet delayed;
+    typedef std::map<framing::SequenceNumber, boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
+    Delayed delayed;
     ReplicatingSubscription* subscription;
     boost::shared_ptr<QueueObserver> observer;
     QueueRange range;
+
+    void completeRange(Delayed::iterator begin, Delayed::iterator end);
 };
 }} // namespace qpid::ha
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Aug 10 12:04:27 2012
@@ -120,8 +120,10 @@ void QueueReplicator::initializeBridge(B
     settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO,
                       brokerInfo.asFieldTable());
     SequenceNumber front;
-    if (ReplicatingSubscription::getFront(*queue, front))
+    if (ReplicatingSubscription::getFront(*queue, front)) {
         settings.setInt(ReplicatingSubscription::QPID_FRONT, front);
+        QPID_LOG(debug, "QPID_FRONT for " << queue->getName() << " is " << front);
+    }
     peer.getMessage().subscribe(
         args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
         false/*exclusive*/, "", 0, settings);
@@ -137,8 +139,7 @@ void QueueReplicator::initializeBridge(B
 
 namespace {
 template <class T> T decodeContent(Message& m) {
-    std::string content;
-    m.getFrames().getContent(content);
+    std::string content = m.getContent();
     Buffer buffer(const_cast<char*>(content.c_str()), content.size());
     T result;
     result.decode(buffer);
@@ -148,9 +149,7 @@ template <class T> T decodeContent(Messa
 
 void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) {
     // Thread safe: only calls thread safe Queue functions.
-    QueuedMessage message;
-    if (queue->acquireMessageAt(n, message))
-        queue->dequeue(0, message);
+    queue->dequeueMessageAt(n);
 }
 
 // Called in connection thread of the queues bridge to primary.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Fri Aug 10 12:04:27 2012
@@ -23,6 +23,7 @@
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueRegistry.h"
+#include "qpid/log/Statement.h"
 #include <boost/bind.hpp>
 
 namespace qpid {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Aug 10 12:04:27 2012
@@ -27,6 +27,7 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Statement.h"
@@ -66,10 +67,10 @@ class DequeueScanner
         at = front - 1;
     }
 
-    void operator()(const QueuedMessage& qm) {
-        if (qm.position >= front && qm.position <= back) {
-            if (qm.position > at+1) subscription.dequeued(at+1, qm.position-1);
-            at = qm.position;
+    void operator()(const Message& m) {
+        if (m.getSequence() >= front && m.getSequence() <= back) {
+            if (m.getSequence() > at+1) subscription.dequeued(at+1, m.getSequence()-1);
+            at = m.getSequence();
         }
     }
 
@@ -90,37 +91,23 @@ string mask(const string& in)
     return DOLLAR + in + INTERNAL;
 }
 
-
-/** Dummy consumer used to get the front position on the queue */
-class GetPositionConsumer : public Consumer
+namespace {
+bool getSequence(const Message& message, SequenceNumber& result)
 {
-  public:
-    GetPositionConsumer() :
-        Consumer("ha.GetPositionConsumer."+types::Uuid(true).str(), false) {}
-    bool deliver(broker::QueuedMessage& ) { return true; }
-    void notify() {}
-    bool filter(boost::intrusive_ptr<broker::Message>) { return true; }
-    bool accept(boost::intrusive_ptr<broker::Message>) { return true; }
-    void cancel() {}
-    void acknowledged(const broker::QueuedMessage&) {}
-    bool browseAcquired() const { return true; }
-    broker::OwnershipToken* getSession() { return 0; }
-};
-
-
+    result = message.getSequence();
+    return true;
+}
+}
 bool ReplicatingSubscription::getNext(
     broker::Queue& q, SequenceNumber from, SequenceNumber& result)
 {
-    boost::shared_ptr<Consumer> c(new GetPositionConsumer);
-    c->setPosition(from);
-    if (!q.dispatch(c)) return false;
-    result = c->getPosition();
-    return true;
+    QueueCursor cursor(REPLICATOR);
+    return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), from);
 }
 
 bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) {
-    // FIXME aconway 2012-05-23: won't wrap, assumes 0 is < all messages in queue.
-    return getNext(q, 0, front);
+    QueueCursor cursor(REPLICATOR);
+    return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(front)));
 }
 
 /* Called by SemanticState::consume to create a consumer */
@@ -152,15 +139,14 @@ ReplicatingSubscription::ReplicatingSubs
     const string& name,
     Queue::shared_ptr queue,
     bool ack,
-    bool acquire,
+    bool /*acquire*/,
     bool exclusive,
     const string& tag,
     const string& resumeId,
     uint64_t resumeTtl,
     const framing::FieldTable& arguments
-) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
+) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
                  resumeId, resumeTtl, arguments),
-    dummy(new Queue(mask(name))),
     ready(false)
 {
     try {
@@ -213,6 +199,8 @@ ReplicatingSubscription::ReplicatingSubs
             queue->eachMessage(boost::ref(scan)); // Remove missing messages in between.
             scan.finish();
             position = backup.back;
+            //move cursor to position
+            queue->seek(*this, position);
         }
         // NOTE: we are assuming that the messages that are on the backup are
         // consistent with those on the primary. If the backup is a replica
@@ -260,32 +248,31 @@ void ReplicatingSubscription::initialize
 }
 
 // Message is delivered in the subscription's connection thread.
-bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
+bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) {
+    position = m.getSequence();
     try {
-        // Add position events for the subscribed queue, not the internal event queue.
-        if (qm.queue == getQueue().get()) {
-            QPID_LOG(trace, logPrefix << "Replicating " << qm);
-            {
-                Mutex::ScopedLock l(lock);
-                assert(position == qm.position);
-                // qm.position is the position of the newly enqueued qm on local queue.
-                // backupPosition is latest position on backup queue before enqueueing
-                if (qm.position <= backupPosition)
-                    throw Exception(
-                        QPID_MSG("Expected position >  " << backupPosition
-                                 << " but got " << qm.position));
-                if (qm.position - backupPosition > 1) {
-                    // Position has advanced because of messages dequeued ahead of us.
-                    // Send the position before qm was enqueued.
-                    sendPositionEvent(qm.position-1, l);
-                }
-                // Backup will automatically advance by 1 on delivery of message.
-                backupPosition = qm.position;
+        QPID_LOG(trace, logPrefix << "Replicating " << getQueue()->getName() << "[" << m.getSequence() << "]");
+        {
+            Mutex::ScopedLock l(lock);
+            //FIXME GRS: position is no longer set//assert(position == m.getSequence());
+
+            // m.getSequence() is the position of the newly enqueued message on local queue.
+            // backupPosition is latest position on backup queue before enqueueing
+            if (m.getSequence() <= backupPosition)
+                throw Exception(
+                    QPID_MSG("Expected position >  " << backupPosition
+                             << " but got " << m.getSequence()));
+            if (m.getSequence() - backupPosition > 1) {
+                // Position has advanced because of messages dequeued ahead of us.
+                // Send the position before message was enqueued.
+                sendPositionEvent(m.getSequence()-1, l);
             }
+            // Backup will automatically advance by 1 on delivery of message.
+            backupPosition = m.getSequence();
         }
-        return ConsumerImpl::deliver(qm);
+        return ConsumerImpl::deliver(c, m);
     } catch (const std::exception& e) {
-        QPID_LOG(critical, logPrefix << "Error replicating " << qm
+        QPID_LOG(critical, logPrefix << "Error replicating " << getQueue()->getName() << "[" << m.getSequence() << "]"
                  << ": " << e.what());
         throw;
     }
@@ -310,15 +297,13 @@ void ReplicatingSubscription::cancel()
 }
 
 // Consumer override, called on primary in the backup's IO thread.
-void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) {
-    if (qm.queue == getQueue().get()) { // Don't complete messages on the internal queue
-        // Finish completion of message, it has been acknowledged by the backup.
-        QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
-        guard->complete(qm);
-        // If next message is protected by the guard then we are ready
-        if (qm.position >= guard->getRange().back) setReady();
-    }
-    ConsumerImpl::acknowledged(qm);
+void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
+    // Finish completion of message, it has been acknowledged by the backup.
+    QPID_LOG(trace, logPrefix << "Acknowledged " << getQueue()->getName() << "[" << r.getMessageId() << "]");
+    guard->complete(r.getMessageId());
+    // If next message is protected by the guard then we are ready
+    if (r.getMessageId() >= guard->getRange().back) setReady();
+    ConsumerImpl::acknowledged(r);
 }
 
 // Called with lock held. Called in subscription's connection thread.
@@ -341,13 +326,12 @@ void ReplicatingSubscription::sendDequeu
 // Called after the message has been removed
 // from the deque and under the messageLock in the queue. Called in
 // arbitrary connection threads.
-void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
+void ReplicatingSubscription::dequeued(const Message& m)
 {
-    assert (qm.queue == getQueue().get());
-    QPID_LOG(trace, logPrefix << "Dequeued " << qm);
+    QPID_LOG(trace, logPrefix << "Dequeued " << getQueue()->getName() << "[" << m.getSequence() << "]");
     {
         Mutex::ScopedLock l(lock);
-        dequeues.add(qm.position);
+        dequeues.add(m.getSequence());
     }
     notify();                   // Ensure a call to doDispatch
 }
@@ -379,7 +363,7 @@ void ReplicatingSubscription::sendPositi
 void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer)
 {
     //generate event message
-    boost::intrusive_ptr<Message> event = new Message();
+    boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> event(new qpid::broker::amqp_0_10::MessageTransfer());
     AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
     AMQFrame header((AMQHeaderBody()));
     AMQFrame content((AMQContentBody()));
@@ -400,10 +384,8 @@ void ReplicatingSubscription::sendEvent(
         event->getFrames().getHeaders()->get<DeliveryProperties>(true);
     props->setRoutingKey(key);
     // Send the event directly to the base consumer implementation.
-    // We don't really need a queue here but we pass a dummy queue
-    // to conform to the consumer API.
-    QueuedMessage qm(dummy.get(), event);
-    ConsumerImpl::deliver(qm);
+    //dummy consumer prevents acknowledgements being handled, which is what we want for events
+    ConsumerImpl::deliver(QueueCursor(), Message(event, 0), boost::shared_ptr<Consumer>());
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1371676&r1=1371675&r2=1371676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Aug 10 12:04:27 2012
@@ -101,15 +101,15 @@ class ReplicatingSubscription : public b
 
     // Called via QueueGuard::dequeued.
     //@return true if the message requires completion.
-    void dequeued(const broker::QueuedMessage& qm);
+    void dequeued(const broker::Message&);
 
     // Called during initial scan for dequeues.
     void dequeued(framing::SequenceNumber first, framing::SequenceNumber last);
 
     // Consumer overrides.
-    bool deliver(broker::QueuedMessage& msg);
+    bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg);
     void cancel();
-    void acknowledged(const broker::QueuedMessage&);
+    void acknowledged(const broker::DeliveryRecord&);
     bool browseAcquired() const { return true; }
     // Hide the "queue deleted" error for a ReplicatingSubscription when a
     // queue is deleted, this is normal and not an error.
@@ -127,8 +127,8 @@ class ReplicatingSubscription : public b
 
   private:
     std::string logPrefix;
-    boost::shared_ptr<broker::Queue> dummy; // Used to send event messages
     framing::SequenceSet dequeues;
+    framing::SequenceNumber position;
     framing::SequenceNumber backupPosition;
     bool ready;
     BrokerInfo info;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org