You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/20 20:43:26 UTC

svn commit: r1186990 [12/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/Cluster.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/Cluster.h Thu Oct 20 18:42:46 2011
@@ -56,17 +56,25 @@ namespace qpid {
 
 namespace broker {
 class Message;
+class AclModule;
 }
 
 namespace framing {
+class AMQFrame;
 class AMQBody;
-class Uuid;
+struct Uuid;
+}
+
+namespace sys {
+class Timer;
+class AbsTime;
+class Duration;
 }
 
 namespace cluster {
 
 class Connection;
-class EventFrame;
+struct EventFrame;
 class ClusterTimer;
 class UpdateDataExchange;
 
@@ -89,10 +97,10 @@ class Cluster : private Cpg::Handler, pu
     void initialize();
 
     // Connection map.
-    void addLocalConnection(const ConnectionPtr&); 
-    void addShadowConnection(const ConnectionPtr&); 
-    void erase(const ConnectionId&);       
-    
+    void addLocalConnection(const ConnectionPtr&);
+    void addShadowConnection(const ConnectionPtr&);
+    void erase(const ConnectionId&);
+
     // URLs of current cluster members.
     std::vector<std::string> getIds() const;
     std::vector<Url> getUrls() const;
@@ -107,7 +115,7 @@ class Cluster : private Cpg::Handler, pu
     void updateInRetracted();
     // True if we are expecting to receive catch-up connections.
     bool isExpectingUpdate();
-    
+
     MemberId getId() const;
     broker::Broker& getBroker() const;
     Multicaster& getMulticast() { return mcast; }
@@ -135,6 +143,12 @@ class Cluster : private Cpg::Handler, pu
     bool deferDeliveryImpl(const std::string& queue,
                            const boost::intrusive_ptr<broker::Message>& msg);
 
+    sys::AbsTime getClusterTime();
+    void sendClockUpdate();
+    void clock(const uint64_t time);
+
+    static bool loggable(const framing::AMQFrame&); // True if the frame should be logged.
+
   private:
     typedef sys::Monitor::ScopedLock Lock;
 
@@ -144,10 +158,10 @@ class Cluster : private Cpg::Handler, pu
 
     /** Version number of the cluster protocol, to avoid mixed versions. */
     static const uint32_t CLUSTER_VERSION;
-    
+
     // NB: A dummy Lock& parameter marks functions that must only be
     // called with Cluster::lock  locked.
- 
+
     void leave(Lock&);
     std::vector<std::string> getIds(Lock&) const;
     std::vector<Url> getUrls(Lock&) const;
@@ -156,11 +170,11 @@ class Cluster : private Cpg::Handler, pu
     void brokerShutdown();
 
     // == Called in deliverEventQueue thread
-    void deliveredEvent(const Event&); 
+    void deliveredEvent(const Event&);
 
     // == Called in deliverFrameQueue thread
-    void deliveredFrame(const EventFrame&); 
-    void processFrame(const EventFrame&, Lock&); 
+    void deliveredFrame(const EventFrame&);
+    void processFrame(const EventFrame&, Lock&);
 
     // Cluster controls implement XML methods from cluster.xml.
     void updateRequest(const MemberId&, const std::string&, Lock&);
@@ -180,12 +194,12 @@ class Cluster : private Cpg::Handler, pu
                       const std::string& left,
                       const std::string& joined,
                       Lock& l);
-    void messageExpired(const MemberId&, uint64_t, Lock& l);
     void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
     void timerWakeup(const MemberId&, const std::string& name, Lock&);
     void timerDrop(const MemberId&, const std::string& name, Lock&);
     void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
     void deliverToQueue(const std::string& queue, const std::string& message, Lock&);
+    void clock(const uint64_t time, Lock&);
 
     // Helper functions
     ConnectionPtr getConnection(const EventFrame&, Lock&);
@@ -195,7 +209,7 @@ class Cluster : private Cpg::Handler, pu
     void setReady(Lock&);
     void memberUpdate(Lock&);
     void setClusterId(const framing::Uuid&, Lock&);
-    void erase(const ConnectionId&, Lock&);       
+    void erase(const ConnectionId&, Lock&);
     void requestUpdate(Lock& );
     void initMapCompleted(Lock&);
     void becomeElder(Lock&);
@@ -203,7 +217,7 @@ class Cluster : private Cpg::Handler, pu
     void updateMgmtMembership(Lock&);
 
     // == Called in CPG dispatch thread
-    void deliver( // CPG deliver callback. 
+    void deliver( // CPG deliver callback.
         cpg_handle_t /*handle*/,
         const struct cpg_name *group,
         uint32_t /*nodeid*/,
@@ -212,7 +226,7 @@ class Cluster : private Cpg::Handler, pu
         int /*msg_len*/);
 
     void deliverEvent(const Event&);
-    
+
     void configChange( // CPG config change callback.
         cpg_handle_t /*handle*/,
         const struct cpg_name */*group*/,
@@ -263,7 +277,7 @@ class Cluster : private Cpg::Handler, pu
     // Used only in deliverEventQueue thread or when stalled for update.
     Decoder decoder;
     bool discarding;
-    
+
 
     // Remaining members are protected by lock.
     mutable sys::Monitor lock;
@@ -276,7 +290,7 @@ class Cluster : private Cpg::Handler, pu
         JOINER,  ///< Sent update request, waiting for update offer.
         UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete.
         CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event.
-        READY,   ///< Fully operational 
+        READY,   ///< Fully operational
         OFFER,   ///< Sent an offer, waiting for accept/reject.
         UPDATER, ///< Offer accepted, sending a state update.
         LEFT     ///< Final state, left the cluster.
@@ -296,9 +310,13 @@ class Cluster : private Cpg::Handler, pu
     ErrorCheck error;
     UpdateReceiver updateReceiver;
     ClusterTimer* timer;
+    sys::Timer clockTimer;
+    sys::AbsTime clusterTime;
+    sys::Duration clusterTimeOffset;
+    broker::AclModule* acl;
 
   friend std::ostream& operator<<(std::ostream&, const Cluster&);
-  friend class ClusterDispatcher;
+  friend struct ClusterDispatcher;
 };
 
 }} // namespace qpid::cluster

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterMap.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterMap.cpp Thu Oct 20 18:42:46 2011
@@ -50,11 +50,6 @@ void insertFieldTableFromMapValue(FieldT
     ft.setString(vt.first.str(), vt.second.str());
 }
 
-void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) {
-    ft.clear();
-    for_each(map.begin(), map.end(), bind(&insertFieldTableFromMapValue, ref(ft), _1));
-}
-
 }
 
 ClusterMap::ClusterMap() : frameSeq(0) {}

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterPlugin.cpp Thu Oct 20 18:42:46 2011
@@ -72,6 +72,7 @@ struct ClusterOptions : public Options {
             ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
 #endif
             ("cluster-size", optValue(settings.size, "N"), "Wait for N cluster members before allowing clients to connect.")
+            ("cluster-clock-interval", optValue(settings.clockInterval,"N"), "How often to broadcast the current time to the cluster nodes, in milliseconds. A value between 5 and 1000 is recommended.")
             ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit  reads per connection. 0=no limit.")
             ;
     }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterSettings.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterSettings.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterSettings.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterSettings.h Thu Oct 20 18:42:46 2011
@@ -35,8 +35,9 @@ struct ClusterSettings {
     size_t readMax;
     std::string username, password, mechanism;
     size_t size;
+    uint16_t clockInterval;
 
-    ClusterSettings() : quorum(false), readMax(10), size(1)
+    ClusterSettings() : quorum(false), readMax(10), size(1), clockInterval(10)
     {}
   
     Url getUrl(uint16_t port) const {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterTimer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterTimer.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterTimer.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterTimer.cpp Thu Oct 20 18:42:46 2011
@@ -70,6 +70,7 @@ void ClusterTimer::add(intrusive_ptr<Tim
     if (i != map.end())
         throw Exception(QPID_MSG("Task already exists with name " << task->getName()));
     map[task->getName()] = task;
+
     // Only the elder actually activates the task with the Timer base class.
     if (cluster.isElder()) {
         QPID_LOG(trace, "Elder activating cluster timer task " << task->getName());
@@ -112,6 +113,9 @@ void ClusterTimer::deliverWakeup(const s
     else {
         intrusive_ptr<TimerTask> t = i->second;
         map.erase(i);
+        // Move the nextFireTime so readyToFire() is true. This is to ensure we
+        // don't get an error if the fired task calls setupNextFire()
+        t->setFired();
         Timer::fire(t);
     }
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/Connection.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/Connection.cpp Thu Oct 20 18:42:46 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,8 @@
 #include "Cluster.h"
 #include "UpdateReceiver.h"
 #include "qpid/assert.h"
+#include "qpid/broker/DtxAck.h"
+#include "qpid/broker/DtxBuffer.h"
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/SemanticState.h"
 #include "qpid/broker/TxBuffer.h"
@@ -35,6 +37,7 @@
 #include "qpid/broker/Fairshare.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/Bridge.h"
+#include "qpid/broker/StatefulQueueObserver.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/AMQFrame.h"
@@ -78,7 +81,7 @@ const std::string shadowPrefix("[shadow]
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& mgmtId,
                        const ConnectionId& id, const qpid::sys::SecuritySettings& external)
-    : cluster(c), self(id), catchUp(false), output(*this, out),
+    : cluster(c), self(id), catchUp(false), announced(false), output(*this, out),
       connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true),
       expectProtocolHeader(false),
       mcastFrameHandler(cluster.getMulticast(), self),
@@ -90,13 +93,15 @@ Connection::Connection(Cluster& c, sys::
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& mgmtId, MemberId member,
                        bool isCatchUp, bool isLink, const qpid::sys::SecuritySettings& external
-) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
+) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), announced(false), output(*this, out),
     connectionCtor(&output, cluster.getBroker(),
                    mgmtId,
                    external,
                    isLink,
                    isCatchUp ? ++catchUpId : 0,
-                   isCatchUp),  // isCatchUp => shadow
+                   // The first catch-up connection is not considered a shadow
+                   // as it needs to be authenticated.
+                   isCatchUp && self.second > 1),
     expectProtocolHeader(isLink),
     mcastFrameHandler(cluster.getMulticast(), self),
     updateIn(c.getUpdateReceiver()),
@@ -113,7 +118,7 @@ Connection::Connection(Cluster& c, sys::
         if (!updateIn.nextShadowMgmtId.empty())
             connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
         updateIn.nextShadowMgmtId.clear();
-     }
+    }
     init();
     QPID_LOG(debug, cluster << " local connection " << *this);
 }
@@ -143,7 +148,7 @@ void Connection::init() {
 // Called when we have consumed a read buffer to give credit to the
 // connection layer to continue reading.
 void Connection::giveReadCredit(int credit) {
-    if (cluster.getSettings().readMax && credit) 
+    if (cluster.getSettings().readMax && credit)
         output.giveReadCredit(credit);
 }
 
@@ -166,7 +171,7 @@ void Connection::announce(
         AMQFrame frame;
         while (frame.decode(buf))
             connection->received(frame);
-         connection->setUserId(username);
+        connection->setUserId(username);
     }
     // Do managment actions now that the connection is replicated.
     connection->raiseConnectEvent();
@@ -193,7 +198,7 @@ void Connection::received(framing::AMQFr
                  << *this << ": " << f);
         return;
     }
-    QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
+    QPID_LOG_IF(trace, Cluster::loggable(f), cluster << " RECV " << *this << ": " << f);
     if (isLocal()) {            // Local catch-up connection.
         currentChannel = f.getChannel();
         if (!framing::invoke(*this, *f.getBody()).wasHandled())
@@ -201,7 +206,7 @@ void Connection::received(framing::AMQFr
     }
     else {             // Shadow or updated catch-up connection.
         if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
-            if (isShadow()) 
+            if (isShadow())
                 cluster.addShadowConnection(this);
             AMQFrame ok((ConnectionCloseOkBody()));
             connection->getOutput().send(ok);
@@ -213,16 +218,9 @@ void Connection::received(framing::AMQFr
     }
 }
 
-bool Connection::checkUnsupported(const AMQBody& body) {
-    std::string message;
-    if (body.getMethod()) {
-        switch (body.getMethod()->amqpClassId()) {
-          case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
-        }
-    }
-    if (!message.empty())
-        connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message);
-    return !message.empty();
+bool Connection::checkUnsupported(const AMQBody&) {
+    // Throw an exception for unsupported commands. Currently all are supported.
+    return false;
 }
 
 struct GiveReadCreditOnExit {
@@ -241,7 +239,7 @@ void Connection::deliverDoOutput(uint32_
 void Connection::deliveredFrame(const EventFrame& f) {
     GiveReadCreditOnExit gc(*this, f.readCredit);
     assert(!catchUp);
-    currentChannel = f.frame.getChannel(); 
+    currentChannel = f.frame.getChannel();
     if (f.frame.getBody()       // frame can be emtpy with just readCredit
         && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
         && !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
@@ -255,7 +253,7 @@ void Connection::deliveredFrame(const Ev
     }
 }
 
-// A local connection is closed by the network layer.
+// A local connection is closed by the network layer. Called in the connection thread.
 void Connection::closed() {
     try {
         if (isUpdated()) {
@@ -272,8 +270,9 @@ void Connection::closed() {
             // closed and process any outstanding frames from the cluster
             // until self-delivery of deliver-close.
             output.closeOutput();
-            cluster.getMulticast().mcastControl(
-                ClusterConnectionDeliverCloseBody(), self);
+            if (announced)
+                cluster.getMulticast().mcastControl(
+                    ClusterConnectionDeliverCloseBody(), self);
         }
     }
     catch (const std::exception& e) {
@@ -287,7 +286,7 @@ void Connection::deliverClose () {
     cluster.erase(self);
 }
 
-// Close the connection 
+// Close the connection
 void Connection::close() {
     if (connection.get()) {
         QPID_LOG(debug, cluster << " closed connection " << *this);
@@ -320,10 +319,10 @@ size_t Connection::decode(const char* da
         while (localDecoder.decode(buf))
             received(localDecoder.getFrame());
         if (!wasOpen && connection->isOpen()) {
-            // Connections marked as federation links are allowed to proxy
+            // Connections marked with setUserProxyAuth are allowed to proxy
             // messages with user-ID that doesn't match the connection's
             // authenticated ID. This is important for updates.
-            connection->setFederationLink(isCatchUp());
+            connection->setUserProxyAuth(isCatchUp());
         }
     }
     else {                      // Multicast local connections.
@@ -332,9 +331,9 @@ size_t Connection::decode(const char* da
         if (!checkProtocolHeader(ptr, size)) // Updates ptr
             return 0; // Incomplete header
 
-        if (!connection->isOpen()) 
+        if (!connection->isOpen())
             processInitialFrames(ptr, end-ptr); // Updates ptr
-        
+
         if (connection->isOpen() && end - ptr > 0) {
             // We're multi-casting, we will give read credit on delivery.
             grc.credit = 0;
@@ -384,6 +383,7 @@ void Connection::processInitialFrames(co
                 connection->getUserId(),
                 initialFrames),
             getId());
+        announced = true;
         initialFrames.clear();
     }
 }
@@ -406,11 +406,11 @@ void Connection::shadowSetUser(const std
 
 void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
 {
-    broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
-    c.position = position;
-    c.setBlocked(blocked);
-    if (notifyEnabled) c.enableNotify(); else c.disableNotify();
-    updateIn.consumerNumbering.add(c.shared_from_this());
+    broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name);
+    c->position = position;
+    c->setBlocked(blocked);
+    if (notifyEnabled) c->enableNotify(); else c->disableNotify();
+    updateIn.consumerNumbering.add(c);
 }
 
 
@@ -421,7 +421,8 @@ void Connection::sessionState(
     const SequenceNumber& expected,
     const SequenceNumber& received,
     const SequenceSet& unknownCompleted,
-    const SequenceSet& receivedIncomplete)
+    const SequenceSet& receivedIncomplete,
+    bool dtxSelected)
 {
     sessionState().setState(
         replayStart,
@@ -431,8 +432,10 @@ void Connection::sessionState(
         received,
         unknownCompleted,
         receivedIncomplete);
-    QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
-    // The output tasks will be added later in the update process. 
+    if (dtxSelected) semanticState().selectDtx();
+    QPID_LOG(debug, cluster << " received session state update for "
+             << sessionState().getId());
+    // The output tasks will be added later in the update process.
     connection->getOutputTasks().removeAll();
 }
 
@@ -441,7 +444,7 @@ void Connection::outputTask(uint16_t cha
     if (!session)
         throw Exception(QPID_MSG(cluster << " channel not attached " << *this
                                  << "[" <<  channel << "] "));
-    OutputTask* task = &session->getSemanticState().find(name);
+    OutputTask* task = session->getSemanticState().find(name).get();
     connection->getOutputTasks().addOutputTask(task);
 }
 
@@ -461,11 +464,24 @@ void Connection::shadowReady(
     output.setSendMax(sendMax);
 }
 
+void Connection::setDtxBuffer(const UpdateReceiver::DtxBufferRef& bufRef) {
+    broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+    broker::DtxWorkRecord* record = mgr.getWork(bufRef.xid);
+    broker::DtxBuffer::shared_ptr buffer = (*record)[bufRef.index];
+    if (bufRef.suspended)
+        bufRef.semanticState->getSuspendedXids()[bufRef.xid] = buffer;
+    else
+        bufRef.semanticState->setDtxBuffer(buffer);
+}
+
+// Marks the end of the update.
 void Connection::membership(const FieldTable& joiners, const FieldTable& members,
                             const framing::SequenceNumber& frameSeq)
 {
     QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
     updateIn.consumerNumbering.clear();
+    for_each(updateIn.dtxBuffers.begin(), updateIn.dtxBuffers.end(),
+             boost::bind(&Connection::setDtxBuffer, this, _1));
     closeUpdated();
     cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
 }
@@ -478,7 +494,7 @@ void Connection::retractOffer() {
 
 void Connection::closeUpdated() {
     self.second = 0;      // Mark this as completed update connection.
-    if (connection.get()) 
+    if (connection.get())
         connection->close(connection::CLOSE_CODE_NORMAL, "OK");
 }
 
@@ -529,12 +545,20 @@ void Connection::deliveryRecord(const st
             m = getUpdateMessage();
             m.queue = queue.get();
             m.position = position;
-            if (enqueued) queue->updateEnqueued(m); //inform queue of the message 
+            if (enqueued) queue->updateEnqueued(m); //inform queue of the message
         } else {                // Message at original position in original queue
-            m = queue->find(position);
+            queue->find(position, m);
         }
-        if (!m.payload)
-            throw Exception(QPID_MSG("deliveryRecord no update message"));
+        // FIXME aconway 2011-08-19: removed:
+        // if (!m.payload)
+        //      throw Exception(QPID_MSG("deliveryRecord no update message"));
+        //
+        // It seems this could happen legitimately in the case one
+        // session browses message M, then another session acquires
+        // it. In that case the browsers delivery record is !acquired
+        // but the message is not on its original Queue. In that case
+        // we'll get a deliveryRecord with no payload for the browser.
+        //
     }
 
     broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
@@ -542,7 +566,11 @@ void Connection::deliveryRecord(const st
     if (cancelled) dr.cancel(dr.getTag());
     if (completed) dr.complete();
     if (ended) dr.setEnded();   // Exsitance of message
-    semanticState().record(dr); // Part of the session's unacked list.
+
+    if (dtxBuffer)              // Record for next dtx-ack
+        dtxAckRecords.push_back(dr);
+    else
+        semanticState().record(dr); // Record on session's unacked list.
 }
 
 void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
@@ -556,8 +584,46 @@ void Connection::queueFairshareState(con
     }
 }
 
-void Connection::expiryId(uint64_t id) {
-    cluster.getExpiryPolicy().setId(id);
+
+namespace {
+// find a StatefulQueueObserver that matches a given identifier
+class ObserverFinder {
+    const std::string id;
+    boost::shared_ptr<broker::QueueObserver> target;
+    ObserverFinder(const ObserverFinder&) {}
+  public:
+    ObserverFinder(const std::string& _id) : id(_id) {}
+    broker::StatefulQueueObserver *getObserver()
+    {
+        if (target)
+            return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
+        return 0;
+    }
+    void operator() (boost::shared_ptr<broker::QueueObserver> o)
+    {
+        if (!target) {
+            broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+            if (p && p->getId() == id) {
+                target = o;
+            }
+        }
+    }
+};
+}
+
+
+void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state)
+{
+    boost::shared_ptr<broker::Queue> queue(findQueue(qname));
+    ObserverFinder finder(observerId);      // find this observer
+    queue->eachObserver<ObserverFinder &>(finder);
+    broker::StatefulQueueObserver *so = finder.getObserver();
+    if (so) {
+        so->setState( state );
+        QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ...");
+        return;
+    }
+    QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies.");
 }
 
 std::ostream& operator<<(std::ostream& o, const Connection& c) {
@@ -574,6 +640,7 @@ std::ostream& operator<<(std::ostream& o
 void Connection::txStart() {
     txBuffer.reset(new broker::TxBuffer());
 }
+
 void Connection::txAccept(const framing::SequenceSet& acked) {
     txBuffer->enlist(boost::shared_ptr<broker::TxAccept>(
                          new broker::TxAccept(acked, semanticState().getUnacked())));
@@ -589,9 +656,11 @@ void Connection::txEnqueue(const std::st
                          new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload)));
 }
 
-void Connection::txPublish(const framing::Array& queues, bool delivered) {
-    boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload));
-    for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) 
+void Connection::txPublish(const framing::Array& queues, bool delivered)
+{
+    boost::shared_ptr<broker::TxPublish> txPub(
+        new broker::TxPublish(getUpdateMessage().payload));
+    for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
         txPub->deliverTo(findQueue((*i)->get<std::string>()));
     txPub->delivered = delivered;
     txBuffer->enlist(txPub);
@@ -605,6 +674,51 @@ void Connection::accumulatedAck(const qp
     semanticState().setAccumulatedAck(s);
 }
 
+void Connection::dtxStart(const std::string& xid,
+                          bool ended,
+                          bool suspended,
+                          bool failed,
+                          bool expired)
+{
+    dtxBuffer.reset(new broker::DtxBuffer(xid, ended, suspended, failed, expired));
+    txBuffer = dtxBuffer;
+}
+
+void Connection::dtxEnd() {
+    broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+    std::string xid = dtxBuffer->getXid();
+    if (mgr.exists(xid))
+        mgr.join(xid, dtxBuffer);
+    else
+        mgr.start(xid, dtxBuffer);
+    dtxBuffer.reset();
+    txBuffer.reset();
+}
+
+// Sent after all DeliveryRecords for a dtx-ack have been collected in dtxAckRecords
+void Connection::dtxAck() {
+    dtxBuffer->enlist(
+        boost::shared_ptr<broker::DtxAck>(new broker::DtxAck(dtxAckRecords)));
+    dtxAckRecords.clear();
+}
+
+void Connection::dtxBufferRef(const std::string& xid, uint32_t index, bool suspended) {
+    // Save the association between DtxBuffers and the session so we
+    // can set the DtxBuffers at the end of the update when the
+    // DtxManager has been replicated.
+    updateIn.dtxBuffers.push_back(
+        UpdateReceiver::DtxBufferRef(xid, index, suspended, &semanticState()));
+}
+
+// Sent at end of work record.
+void Connection::dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout)
+{
+    broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+    if (timeout) mgr.setTimeout(xid, timeout);
+    if (prepared) mgr.prepare(xid);
+}
+
+
 void Connection::exchange(const std::string& encoded) {
     Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
     broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);
@@ -614,12 +728,6 @@ void Connection::exchange(const std::str
     QPID_LOG(debug, cluster << " updated exchange " << ex->getName());
 }
 
-void Connection::queue(const std::string& encoded) {
-    Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
-    broker::Queue::shared_ptr q = broker::Queue::decode(cluster.getBroker().getQueues(), buf);
-    QPID_LOG(debug, cluster << " updated queue " << q->getName());
-}
-
 void Connection::sessionError(uint16_t , const std::string& msg) {
     // Ignore errors before isOpen(), we're not multicasting yet.
     if (connection->isOpen())
@@ -678,6 +786,23 @@ void Connection::config(const std::strin
     else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
 }
 
+void Connection::doCatchupIoCallbacks() {
+    // We need to process IO callbacks during the catch-up phase in
+    // order to service asynchronous completions for messages
+    // transferred during catch-up.
+
+    if (catchUp) getBrokerConnection()->doIoCallbacks();
+}
+
+void Connection::clock(uint64_t time) {
+    QPID_LOG(debug, "Cluster connection received time update");
+    cluster.clock(time);
+}
+
+void Connection::queueDequeueSincePurgeState(const std::string& qname, uint32_t dequeueSincePurge) {
+    boost::shared_ptr<broker::Queue> queue(findQueue(qname));
+    queue->setDequeueSincePurge(dequeueSincePurge);
+}
 
 }} // Namespace qpid::cluster
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/Connection.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/Connection.h Thu Oct 20 18:42:46 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,11 +24,12 @@
 
 #include "types.h"
 #include "OutputInterceptor.h"
-#include "EventFrame.h"
 #include "McastFrameHandler.h"
 #include "UpdateReceiver.h"
 
+#include "qpid/RefCounted.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/broker/DeliveryRecord.h"
 #include "qpid/broker/SecureConnection.h"
 #include "qpid/broker/SemanticState.h"
 #include "qpid/amqp_0_10/Connection.h"
@@ -47,7 +48,7 @@ namespace framing { class AMQFrame; }
 
 namespace broker {
 class SemanticState;
-class QueuedMessage;
+struct QueuedMessage;
 class TxBuffer;
 class TxAccept;
 }
@@ -55,6 +56,7 @@ class TxAccept;
 namespace cluster {
 class Cluster;
 class Event;
+struct EventFrame;
 
 /** Intercept broker::Connection calls for shadow and local cluster connections. */
 class Connection :
@@ -62,7 +64,7 @@ class Connection :
         public sys::ConnectionInputHandler,
         public framing::AMQP_AllOperations::ClusterConnectionHandler,
         private broker::Connection::ErrorListener
-        
+
 {
   public:
 
@@ -73,7 +75,7 @@ class Connection :
     Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id,
                const qpid::sys::SecuritySettings& external);
     ~Connection();
-    
+
     ConnectionId getId() const { return self; }
     broker::Connection* getBrokerConnection() { return connection.get(); }
     const broker::Connection* getBrokerConnection() const { return connection.get(); }
@@ -108,9 +110,9 @@ class Connection :
     void deliveredFrame(const EventFrame&);
 
     void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position);
-    
+
     // ==== Used in catch-up mode to build initial state.
-    // 
+    //
     // State update methods.
     void shadowPrepare(const std::string&);
 
@@ -122,10 +124,11 @@ class Connection :
                       const framing::SequenceNumber& expected,
                       const framing::SequenceNumber& received,
                       const framing::SequenceSet& unknownCompleted,
-                      const SequenceSet& receivedIncomplete);
-    
+                      const SequenceSet& receivedIncomplete,
+                      bool dtxSelected);
+
     void outputTask(uint16_t channel, const std::string& name);
-    
+
     void shadowReady(uint64_t memberId,
                      uint64_t connectionId,
                      const std::string& managementId,
@@ -153,7 +156,7 @@ class Connection :
 
     void queuePosition(const std::string&, const framing::SequenceNumber&);
     void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
-    void expiryId(uint64_t);
+    void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&);
 
     void txStart();
     void txAccept(const framing::SequenceSet&);
@@ -163,8 +166,18 @@ class Connection :
     void txEnd();
     void accumulatedAck(const framing::SequenceSet&);
 
-    // Encoded queue/exchange replication.
-    void queue(const std::string& encoded);
+    // Dtx state
+    void dtxStart(const std::string& xid,
+                  bool ended,
+                  bool suspended,
+                  bool failed,
+                  bool expired);
+    void dtxEnd();
+    void dtxAck();
+    void dtxBufferRef(const std::string& xid, uint32_t index, bool suspended);
+    void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout);
+
+    // Encoded exchange replication.
     void exchange(const std::string& encoded);
 
     void giveReadCredit(int credit);
@@ -189,6 +202,12 @@ class Connection :
 
     void setSecureConnection ( broker::SecureConnection * sc );
 
+    void doCatchupIoCallbacks();
+
+    void clock(uint64_t time);
+
+    void queueDequeueSincePurgeState(const std::string&, uint32_t);
+
   private:
     struct NullFrameHandler : public framing::FrameHandler {
         void handle(framing::AMQFrame&) {}
@@ -233,7 +252,7 @@ class Connection :
     // Error listener functions
     void connectionError(const std::string&);
     void sessionError(uint16_t channel, const std::string&);
-    
+
     void init();
     bool checkUnsupported(const framing::AMQBody& body);
     void deliverDoOutput(uint32_t limit);
@@ -245,10 +264,11 @@ class Connection :
     broker::SemanticState& semanticState();
     broker::QueuedMessage getUpdateMessage();
     void closeUpdated();
-
+    void setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &);
     Cluster& cluster;
     ConnectionId self;
     bool catchUp;
+    bool announced;
     OutputInterceptor output;
     framing::FrameDecoder localDecoder;
     ConnectionCtor connectionCtor;
@@ -256,6 +276,9 @@ class Connection :
     framing::SequenceNumber deliverSeq;
     framing::ChannelId currentChannel;
     boost::shared_ptr<broker::TxBuffer> txBuffer;
+    boost::shared_ptr<broker::DtxBuffer> dtxBuffer;
+    broker::DeliveryRecords dtxAckRecords;
+    broker::DtxWorkRecord* dtxCurrent;
     bool expectProtocolHeader;
     McastFrameHandler mcastFrameHandler;
     UpdateReceiver& updateIn;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/Decoder.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/Decoder.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/Decoder.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/Decoder.h Thu Oct 20 18:42:46 2011
@@ -31,7 +31,7 @@
 namespace qpid {
 namespace cluster {
 
-class EventFrame;
+struct EventFrame;
 class EventHeader;
 
 /**

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/ErrorCheck.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/ErrorCheck.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/ErrorCheck.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/ErrorCheck.h Thu Oct 20 18:42:46 2011
@@ -33,7 +33,7 @@
 namespace qpid {
 namespace cluster {
 
-class EventFrame;
+struct EventFrame;
 class Cluster;
 class Multicaster;
 class Connection;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/Event.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/Event.cpp Thu Oct 20 18:42:46 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@
 #include "qpid/cluster/Cpg.h"
 #include "qpid/framing/Buffer.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/RefCountedBuffer.h"
 #include "qpid/assert.h"
 #include <ostream>
 #include <iterator>

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/Event.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/Event.h Thu Oct 20 18:42:46 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,7 +23,7 @@
  */
 
 #include "qpid/cluster/types.h"
-#include "qpid/RefCountedBuffer.h"
+#include "qpid/BufferRef.h"
 #include "qpid/framing/AMQFrame.h"
 #include <sys/uio.h>            // For iovec
 #include <iosfwd>
@@ -53,7 +53,7 @@ class EventHeader {
 
     /** Size of payload data, excluding header. */
     size_t getSize() const { return size; }
-    /** Size of header + payload. */ 
+    /** Size of header + payload. */
     size_t getStoreSize() const { return size + HEADER_SIZE; }
 
     bool isCluster() const { return connectionId.getNumber() == 0; }
@@ -62,7 +62,7 @@ class EventHeader {
 
   protected:
     static const size_t HEADER_SIZE;
-    
+
     EventType type;
     ConnectionId connectionId;
     size_t size;
@@ -86,25 +86,25 @@ class Event : public EventHeader {
 
     /** Create a control event. */
     static Event control(const framing::AMQFrame&, const ConnectionId&);
-    
+
     // Data excluding header.
-    char* getData() { return store + HEADER_SIZE; }
-    const char* getData() const { return store + HEADER_SIZE; }
+    char* getData() { return store.begin() + HEADER_SIZE; }
+    const char* getData() const { return store.begin() + HEADER_SIZE; }
 
     // Store including header
-    char* getStore() { return store; }
-    const char* getStore() const { return store; }
+    char* getStore() { return store.begin(); }
+    const char* getStore() const { return store.begin(); }
+
+    const framing::AMQFrame& getFrame() const;
 
-    const framing::AMQFrame& getFrame() const;        
-    
     operator framing::Buffer() const;
 
     iovec toIovec() const;
-    
+
   private:
     void encodeHeader() const;
 
-    RefCountedBuffer::pointer store;
+    BufferRef store;
     mutable framing::AMQFrame frame;
 };
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/EventFrame.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/EventFrame.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/EventFrame.h Thu Oct 20 18:42:46 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,7 +48,7 @@ struct EventFrame
 
 
     ConnectionId connectionId;
-    framing::AMQFrame frame;   
+    framing::AMQFrame frame;
     int readCredit; ///< last frame in an event, give credit when processed.
     EventType type;
 };

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/ExpiryPolicy.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/ExpiryPolicy.cpp Thu Oct 20 18:42:46 2011
@@ -21,106 +21,21 @@
 
 #include "qpid/broker/Message.h"
 #include "qpid/cluster/ExpiryPolicy.h"
-#include "qpid/cluster/Multicaster.h"
-#include "qpid/framing/ClusterMessageExpiredBody.h"
+#include "qpid/cluster/Cluster.h"
 #include "qpid/sys/Time.h"
-#include "qpid/sys/Timer.h"
 #include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace cluster {
 
-ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t)
-    : expiryId(1), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
+ExpiryPolicy::ExpiryPolicy(Cluster& cluster) : cluster(cluster) {}
 
-struct ExpiryTask : public sys::TimerTask {
-    ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
-        : TimerTask(when,"ExpiryPolicy"), expiryPolicy(policy), expiryId(id) {}
-    void fire() { expiryPolicy->sendExpire(expiryId); }
-    boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
-    const uint64_t expiryId;
-};
-
-// Called while receiving an update
-void ExpiryPolicy::setId(uint64_t id) {
-    sys::Mutex::ScopedLock l(lock);
-    expiryId = id;
-}
-
-// Called while giving an update
-uint64_t ExpiryPolicy::getId() const {
-    sys::Mutex::ScopedLock l(lock);
-    return expiryId;
-}
-
-// Called in enqueuing connection thread
-void ExpiryPolicy::willExpire(broker::Message& m) {
-    uint64_t id;
-    {
-        // When messages are fanned out to multiple queues, update sends
-        // them as independenty messages so we can have multiple messages
-        // with the same expiry ID.
-        //
-        sys::Mutex::ScopedLock l(lock);
-        id = expiryId++;
-        if (!id) {              // This is an update of an already-expired message.
-            m.setExpiryPolicy(expiredPolicy);
-        }
-        else {
-            assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
-            // If this is an update, the id may already exist
-            unexpiredById.insert(IdMessageMap::value_type(id, &m));
-            unexpiredByMessage[&m] = id;
-        }
-    }
-    timer.add(new ExpiryTask(this, id, m.getExpiration()));
-}
-
-// Called in dequeueing connection thread
-void ExpiryPolicy::forget(broker::Message& m) {
-    sys::Mutex::ScopedLock l(lock);
-    MessageIdMap::iterator i = unexpiredByMessage.find(&m);
-    assert(i != unexpiredByMessage.end());
-    unexpiredById.erase(i->second);
-    unexpiredByMessage.erase(i);
-}
-
-// Called in dequeueing connection or cleanup thread.
 bool ExpiryPolicy::hasExpired(broker::Message& m) {
-    sys::Mutex::ScopedLock l(lock);
-    return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
-}
-
-// Called in timer thread
-void ExpiryPolicy::sendExpire(uint64_t id) {
-    {
-        sys::Mutex::ScopedLock l(lock);
-        // Don't multicast an expiry notice if message is already forgotten.
-        if (unexpiredById.find(id) == unexpiredById.end()) return;
-    }
-    mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
+    return m.getExpiration() < cluster.getClusterTime();
 }
 
-// Called in CPG deliver thread.
-void ExpiryPolicy::deliverExpire(uint64_t id) {
-    sys::Mutex::ScopedLock l(lock);
-    std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id);
-    IdMessageMap::iterator i = expired.first;
-    while (i != expired.second) {
-        i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true; 
-        unexpiredByMessage.erase(i->second);
-        unexpiredById.erase(i++);
-    }
+sys::AbsTime ExpiryPolicy::getCurrentTime() {
+    return cluster.getClusterTime();
 }
 
-// Called in update thread on the updater.
-boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
-    sys::Mutex::ScopedLock l(lock);
-    MessageIdMap::iterator i = unexpiredByMessage.find(&m);
-    return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
-}
-
-bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }
-void ExpiryPolicy::Expired::willExpire(broker::Message&) { }
-
 }} // namespace qpid::cluster

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/ExpiryPolicy.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/ExpiryPolicy.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/ExpiryPolicy.h Thu Oct 20 18:42:46 2011
@@ -36,12 +36,8 @@ namespace broker {
 class Message;
 }
 
-namespace sys {
-class Timer;
-}
-
 namespace cluster {
-class Multicaster;
+class Cluster;
 
 /**
  * Cluster expiry policy
@@ -49,43 +45,13 @@ class Multicaster;
 class ExpiryPolicy : public broker::ExpiryPolicy
 {
   public:
-    ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&);
+    ExpiryPolicy(Cluster& cluster);
 
-    void willExpire(broker::Message&);
     bool hasExpired(broker::Message&);
-    void forget(broker::Message&);
-
-    // Send expiration notice to cluster.
-    void sendExpire(uint64_t);
-
-    // Cluster delivers expiry notice.
-    void deliverExpire(uint64_t);
+    qpid::sys::AbsTime getCurrentTime();
 
-    void setId(uint64_t id);
-    uint64_t getId() const;
-    
-    boost::optional<uint64_t> getId(broker::Message&);
-    
   private:
-    typedef std::map<broker::Message*,  uint64_t> MessageIdMap;
-    // When messages are fanned out to multiple queues, update sends
-    // them as independenty messages so we can have multiple messages
-    // with the same expiry ID.
-    typedef std::multimap<uint64_t, broker::Message*> IdMessageMap;
-
-    struct Expired : public broker::ExpiryPolicy {
-        bool hasExpired(broker::Message&);
-        void willExpire(broker::Message&);
-    };
-
-    mutable sys::Mutex lock;
-    MessageIdMap unexpiredByMessage;
-    IdMessageMap unexpiredById;
-    uint64_t expiryId;
-    boost::intrusive_ptr<Expired> expiredPolicy;
-    Multicaster& mcast;
-    MemberId memberId;
-    sys::Timer& timer;
+    Cluster& cluster;
 };
 
 }} // namespace qpid::cluster

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/FailoverExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/FailoverExchange.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/FailoverExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/FailoverExchange.cpp Thu Oct 20 18:42:46 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,8 +39,10 @@ using namespace broker;
 using namespace framing;
 
 const string FailoverExchange::typeName("amq.failover");
-    
-FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b)  : Exchange(typeName, parent, b ) {
+
+FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b)
+    : Exchange(typeName, parent, b ), ready(false)
+{
     if (mgmtExchange != 0)
         mgmtExchange->set_type(typeName);
 }
@@ -53,16 +55,17 @@ void FailoverExchange::setUrls(const vec
 void FailoverExchange::updateUrls(const vector<Url>& u) {
     Lock l(lock);
     urls=u;
-    if (urls.empty()) return;
-    std::for_each(queues.begin(), queues.end(),
-                  boost::bind(&FailoverExchange::sendUpdate, this, _1));
+    if (ready && !urls.empty()) {
+        std::for_each(queues.begin(), queues.end(),
+                      boost::bind(&FailoverExchange::sendUpdate, this, _1));
+    }
 }
 
 string FailoverExchange::getType() const { return typeName; }
 
 bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) {
     Lock l(lock);
-    sendUpdate(queue);
+    if (ready) sendUpdate(queue);
     return queues.insert(queue).second;
 }
 
@@ -84,7 +87,7 @@ void FailoverExchange::sendUpdate(const 
     // Called with lock held.
     if (urls.empty()) return;
     framing::Array array(0x95);
-    for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) 
+    for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
         array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
     const ProtocolVersion v;
     boost::intrusive_ptr<Message> msg(new Message);
@@ -96,9 +99,12 @@ void FailoverExchange::sendUpdate(const 
     header.get<MessageProperties>(true)->getApplicationHeaders().setArray(typeName, array);
     AMQFrame headerFrame(header);
     headerFrame.setFirstSegment(false);
-    msg->getFrames().append(headerFrame);    
+    msg->getFrames().append(headerFrame);
     DeliverableMessage(msg).deliverTo(queue);
 }
 
+void FailoverExchange::setReady() {
+    ready = true;
+}
 
 }} // namespace cluster

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/FailoverExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/FailoverExchange.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/FailoverExchange.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/FailoverExchange.h Thu Oct 20 18:42:46 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -46,6 +46,8 @@ class FailoverExchange : public broker::
     void setUrls(const std::vector<Url>&);
     /** Set the URLs and send an update.*/
     void updateUrls(const std::vector<Url>&);
+    /** Flag the failover exchange as ready to generate updates (caught up) */
+    void setReady();
 
     // Exchange overrides
     std::string getType() const;
@@ -56,7 +58,7 @@ class FailoverExchange : public broker::
 
   private:
     void sendUpdate(const boost::shared_ptr<broker::Queue>&);
-    
+
     typedef sys::Mutex::ScopedLock Lock;
     typedef std::vector<Url> Urls;
     typedef std::set<boost::shared_ptr<broker::Queue> > Queues;
@@ -64,7 +66,7 @@ class FailoverExchange : public broker::
     sys::Mutex lock;
     Urls urls;
     Queues queues;
-    
+    bool ready;
 };
 }} // namespace qpid::cluster
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/Multicaster.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/Multicaster.cpp Thu Oct 20 18:42:46 2011
@@ -21,6 +21,7 @@
 
 #include "qpid/cluster/Multicaster.h"
 #include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/Cluster.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/AMQBody.h"
 #include "qpid/framing/AMQFrame.h"
@@ -58,7 +59,7 @@ void Multicaster::mcast(const Event& e) 
             return;
         }
     }
-    QPID_LOG(trace, "MCAST " << e);
+    QPID_LOG_IF(trace, e.isControl() && Cluster::loggable(e.getFrame()), "MCAST " << e);
     if (bypass) {               // direct, don't queue
         iovec iov = e.toIovec();
         while (!cpg.mcast(&iov, 1))

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu Oct 20 18:42:46 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -45,12 +45,11 @@ void OutputInterceptor::send(framing::AM
 }
 
 void OutputInterceptor::activateOutput() {
-    if (parent.isCatchUp()) {
-        sys::Mutex::ScopedLock l(lock);
+    sys::Mutex::ScopedLock l(lock);
+    if (parent.isCatchUp())
         next->activateOutput();
-    }
     else
-        sendDoOutput(sendMax);
+        sendDoOutput(sendMax, l);
 }
 
 void OutputInterceptor::abort() {
@@ -66,29 +65,38 @@ void OutputInterceptor::giveReadCredit(i
 }
 
 // Called in write thread when the IO layer has no more data to write.
-// We do nothing in the write thread, we run doOutput only on delivery
-// of doOutput requests.
-bool OutputInterceptor::doOutput() { return false; }
+// We only process IO callbacks in the write thread during catch-up.
+// Normally we run doOutput only on delivery of doOutput requests.
+bool OutputInterceptor::doOutput() {
+    parent.doCatchupIoCallbacks();
+    return false;
+}
 
-// Send output up to limit, calculate new limit. 
+// Send output up to limit, calculate new limit.
 void OutputInterceptor::deliverDoOutput(uint32_t limit) {
+    sys::Mutex::ScopedLock l(lock);
     sentDoOutput = false;
     sendMax = limit;
     size_t newLimit = limit;
     if (parent.isLocal()) {
-        size_t buffered = getBuffered();
+        size_t buffered = next->getBuffered();
         if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit.
-            newLimit = sendMax*2; 
+            newLimit = sendMax*2;
         else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit.
             newLimit = (sendMax + sent) / 2;
     }
     sent = 0;
-    while (sent < limit && parent.getBrokerConnection()->doOutput())
+    while (sent < limit) {
+        {
+            sys::Mutex::ScopedUnlock u(lock);
+            if (!parent.getBrokerConnection()->doOutput()) break;
+        }
         ++sent;
-    if (sent == limit) sendDoOutput(newLimit);
+    }
+    if (sent == limit) sendDoOutput(newLimit, l);
 }
 
-void OutputInterceptor::sendDoOutput(size_t newLimit) {
+void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&) {
     if (parent.isLocal() && !sentDoOutput && !closing) {
         sentDoOutput = true;
         parent.getCluster().getMulticast().mcastControl(
@@ -97,6 +105,7 @@ void OutputInterceptor::sendDoOutput(siz
     }
 }
 
+// Called in connection thread when local connection closes.
 void OutputInterceptor::closeOutput() {
     sys::Mutex::ScopedLock l(lock);
     closing = true;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/OutputInterceptor.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/OutputInterceptor.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/OutputInterceptor.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/OutputInterceptor.h Thu Oct 20 18:42:46 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -58,13 +58,13 @@ class OutputInterceptor : public sys::Co
 
     uint32_t getSendMax() const { return sendMax; }
     void setSendMax(uint32_t sendMax_) { sendMax=sendMax_; }
-    
+
     cluster::Connection& parent;
-    
+
   private:
     typedef sys::Mutex::ScopedLock Locker;
 
-    void sendDoOutput(size_t newLimit);
+    void sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&);
 
     mutable sys::Mutex lock;
     bool closing;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/SecureConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/SecureConnectionFactory.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/SecureConnectionFactory.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/SecureConnectionFactory.cpp Thu Oct 20 18:42:46 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,7 +48,7 @@ SecureConnectionFactory::create(Protocol
     if (clusterCodec) {
         SecureConnectionPtr sc(new SecureConnection());
         clusterCodec->setSecureConnection(sc.get());
-        sc->setCodec(codec);        
+        sc->setCodec(codec);
         return sc.release();
     }
     return 0;
@@ -63,7 +63,7 @@ SecureConnectionFactory::create(sys::Out
     if (clusterCodec) {
         SecureConnectionPtr sc(new SecureConnection());
         clusterCodec->setSecureConnection(sc.get());
-        sc->setCodec(codec);        
+        sc->setCodec(codec);
         return sc.release();
     }
     return 0;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.cpp Thu Oct 20 18:42:46 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,9 +26,9 @@
 #include "qpid/cluster/Decoder.h"
 #include "qpid/cluster/ExpiryPolicy.h"
 #include "qpid/cluster/UpdateDataExchange.h"
-#include "qpid/client/SessionBase_0_10Access.h" 
-#include "qpid/client/ConnectionAccess.h" 
-#include "qpid/client/SessionImpl.h" 
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/client/ConnectionAccess.h"
+#include "qpid/client/SessionImpl.h"
 #include "qpid/client/ConnectionImpl.h"
 #include "qpid/client/Future.h"
 #include "qpid/broker/Broker.h"
@@ -45,10 +45,13 @@
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/TxOpVisitor.h"
 #include "qpid/broker/DtxAck.h"
+#include "qpid/broker/DtxBuffer.h"
+#include "qpid/broker/DtxWorkRecord.h"
 #include "qpid/broker/TxAccept.h"
 #include "qpid/broker/TxPublish.h"
 #include "qpid/broker/RecoveredDequeue.h"
 #include "qpid/broker/RecoveredEnqueue.h"
+#include "qpid/broker/StatefulQueueObserver.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/ClusterConnectionMembershipBody.h"
 #include "qpid/framing/ClusterConnectionShadowReadyBody.h"
@@ -64,6 +67,7 @@
 #include <boost/bind.hpp>
 #include <boost/cast.hpp>
 #include <algorithm>
+#include <iterator>
 #include <sstream>
 
 namespace qpid {
@@ -82,11 +86,20 @@ using namespace framing;
 namespace arg=client::arg;
 using client::SessionBase_0_10Access;
 
+// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
+const std::string UpdateClient::UPDATE("x-qpid.cluster-update");
+// Name for header used to carry expiration information.
+const std::string UpdateClient::X_QPID_EXPIRATION = "x-qpid.expiration";
+// Headers used to flag headers/properties added by the UpdateClient so they can be
+// removed on the other side.
+const std::string UpdateClient::X_QPID_NO_MESSAGE_PROPS = "x-qpid.no-message-props";
+const std::string UpdateClient::X_QPID_NO_HEADERS = "x-qpid.no-headers";
+
 std::ostream& operator<<(std::ostream& o, const UpdateClient& c) {
     return o << "cluster(" << c.updaterId << " UPDATER)";
 }
 
-struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler 
+struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
 {
     boost::shared_ptr<qpid::client::ConnectionImpl> connection;
 
@@ -120,7 +133,7 @@ void send(client::AsyncSession& s, const
 // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
 
 UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
-                           broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_, 
+                           broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_,
                            const Cluster::ConnectionVector& cons, Decoder& decoder_,
                            const boost::function<void()>& ok,
                            const boost::function<void(const std::exception&)>& fail,
@@ -134,13 +147,11 @@ UpdateClient::UpdateClient(const MemberI
 
 UpdateClient::~UpdateClient() {}
 
-// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
-const std::string UpdateClient::UPDATE("qpid.cluster-update");
-
 void UpdateClient::run() {
     try {
         connection.open(updateeUrl, connectionSettings);
         session = connection.newSession(UPDATE);
+        session.sync();
         update();
         done();
     } catch (const std::exception& e) {
@@ -154,6 +165,13 @@ void UpdateClient::update() {
              << " at " << updateeUrl);
     Broker& b = updaterBroker;
 
+    if(b.getExpiryPolicy()) {
+        QPID_LOG(debug, *this << "Updating updatee with cluster time");
+        qpid::sys::AbsTime clusterTime = b.getExpiryPolicy()->getCurrentTime();
+        int64_t time = qpid::sys::Duration(qpid::sys::EPOCH, clusterTime);
+        ClusterConnectionProxy(session).clock(time);
+    }
+
     updateManagementSetupState();
 
     b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
@@ -163,16 +181,20 @@ void UpdateClient::update() {
     // longer on their original queue.
     session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
     session.sync();
+
     std::for_each(connections.begin(), connections.end(),
                   boost::bind(&UpdateClient::updateConnection, this, _1));
-    session.queueDelete(arg::queue=UPDATE);
+
+    // some Queue Observers need session state & msgs synced first, so sync observers now
+    b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1));
 
     // Update queue listeners: must come after sessions so consumerNumbering is populated
     b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
 
-    ClusterConnectionProxy(session).expiryId(expiry.getId());
     updateLinks();
     updateManagementAgent();
+    updateDtxManager();
+    session.queueDelete(arg::queue=UPDATE);
 
     session.close();
 
@@ -184,7 +206,7 @@ void UpdateClient::update() {
 
     // NOTE: connection will be closed from the other end, don't close
     // it here as that causes a race.
-    
+
     // TODO aconway 2010-03-15: This sleep avoids the race condition
     // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
     // It allows the connection to fully close before destroying the
@@ -276,7 +298,7 @@ class MessageUpdater {
     framing::SequenceNumber lastPos;
     client::AsyncSession session;
     ExpiryPolicy& expiry;
-    
+
   public:
 
     MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) {
@@ -293,7 +315,6 @@ class MessageUpdater {
         }
     }
 
-
     void updateQueuedMessage(const broker::QueuedMessage& message) {
         // Send the queue position if necessary.
         if (!haveLastPos || message.position - lastPos != 1)  {
@@ -302,10 +323,23 @@ class MessageUpdater {
         }
         lastPos = message.position;
 
-        // Send the expiry ID if necessary.
-        if (message.payload->getProperties<DeliveryProperties>()->getTtl()) {
-            boost::optional<uint64_t> expiryId = expiry.getId(*message.payload);
-            ClusterConnectionProxy(session).expiryId(expiryId?*expiryId:0);
+        // if the ttl > 0, we need to send the calculated expiration time to the updatee
+        const DeliveryProperties* dprops =
+            message.payload->getProperties<DeliveryProperties>();
+        if (dprops && dprops->getTtl() > 0) {
+            bool hadMessageProps =
+                message.payload->hasProperties<framing::MessageProperties>();
+            const framing::MessageProperties* mprops =
+                message.payload->getProperties<framing::MessageProperties>();
+            bool hadApplicationHeaders = mprops->hasApplicationHeaders();
+            message.payload->insertCustomProperty(UpdateClient::X_QPID_EXPIRATION,
+                            sys::Duration(sys::EPOCH, message.payload->getExpiration()));
+            // If message properties or application headers didn't exist
+            // prior to us adding data, we want to remove them on the other side.
+            if (!hadMessageProps)
+                message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0);
+            else if (!hadApplicationHeaders)
+                message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_HEADERS, 0);
         }
 
         // We can't send a broker::Message via the normal client API,
@@ -318,7 +352,7 @@ class MessageUpdater {
         framing::MessageTransferBody transfer(
             *message.payload->getFrames().as<framing::MessageTransferBody>());
         transfer.setDestination(UpdateClient::UPDATE);
-        
+
         sb.get()->send(transfer, message.payload->getFrames(),
                        !message.payload->isContentReleased());
         if (message.payload->isContentReleased()){
@@ -326,9 +360,10 @@ class MessageUpdater {
             uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
             bool morecontent = true;
             for (uint64_t offset = 0; morecontent; offset += maxContentSize)
-            {            
+            {
                 AMQFrame frame((AMQContentBody()));
-                morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset);
+                morecontent = message.payload->getContentFrame(
+                    *(message.queue), frame, maxContentSize, offset);
                 sb.get()->sendRawFrame(frame);
             }
         }
@@ -357,6 +392,8 @@ void UpdateClient::updateQueue(client::A
     if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) {
         ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count);
     }
+
+    ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge());
 }
 
 void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
@@ -372,7 +409,11 @@ void UpdateClient::updateNonExclusiveQue
 }
 
 void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& queue, const QueueBinding& binding) {
-    s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
+    if (binding.exchange.size())
+        s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
+    //else its the default exchange and there is no need to replicate
+    //the binding, the creation of the queue will have done so
+    //automatically
 }
 
 void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
@@ -380,8 +421,8 @@ void UpdateClient::updateOutputTask(cons
         boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task);
     SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
     uint16_t channel =  ci->getParent().getSession().getChannel();
-    ClusterConnectionProxy(shadowConnection).outputTask(channel,  ci->getName());
-    QPID_LOG(debug, *this << " updating output task " << ci->getName()
+    ClusterConnectionProxy(shadowConnection).outputTask(channel,  ci->getTag());
+    QPID_LOG(debug, *this << " updating output task " << ci->getTag()
              << " channel=" << channel);
 }
 
@@ -389,7 +430,7 @@ void UpdateClient::updateConnection(cons
     QPID_LOG(debug, *this << " updating connection " << *updateConnection);
     assert(updateConnection->getBrokerConnection());
     broker::Connection& bc = *updateConnection->getBrokerConnection();
-    
+
     // Send the management ID first on the main connection.
     std::string mgmtId = updateConnection->getBrokerConnection()->getMgmtId();
     ClusterConnectionProxy(session).shadowPrepare(mgmtId);
@@ -426,7 +467,7 @@ void UpdateClient::updateSession(broker:
 
     QPID_LOG(debug, *this << " updating session " << ss->getId());
 
-    // Create a client session to update session state. 
+    // Create a client session to update session state.
     boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
     boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel());
     simpl->disableAutoDetach();
@@ -445,19 +486,19 @@ void UpdateClient::updateSession(broker:
     QPID_LOG(debug, *this << " updating unacknowledged messages.");
     broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
     std::for_each(drs.begin(), drs.end(),
-                  boost::bind(&UpdateClient::updateUnacked, this, _1));
+                  boost::bind(&UpdateClient::updateUnacked, this, _1, shadowSession));
 
-    updateTxState(ss->getSemanticState());           // Tx transaction state.
+    updateTransactionState(ss->getSemanticState());
 
     // Adjust command counter for message in progress, will be sent after state update.
     boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
     SequenceNumber received = ss->receiverGetReceived().command;
-    if (inProgress)  
+    if (inProgress)
         --received;
 
     // Sync the session to ensure all responses from broker have been processed.
     shadowSession.sync();
-    
+
     // Reset command-sequence state.
     proxy.sessionState(
         ss->senderGetReplayPoint().command,
@@ -466,7 +507,8 @@ void UpdateClient::updateSession(broker:
         std::max(received, ss->receiverGetExpected().command),
         received,
         ss->receiverGetUnknownComplete(),
-        ss->receiverGetIncomplete()
+        ss->receiverGetIncomplete(),
+        ss->getSemanticState().getDtxSelected()
     );
 
     // Send frames for partial message in progress.
@@ -479,13 +521,13 @@ void UpdateClient::updateSession(broker:
 void UpdateClient::updateConsumer(
     const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
 {
-    QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on "
+    QPID_LOG(debug, *this << " updating consumer " << ci->getTag() << " on "
              << shadowSession.getId());
 
     using namespace message;
     shadowSession.messageSubscribe(
         arg::queue       = ci->getQueue()->getName(),
-        arg::destination = ci->getName(),
+        arg::destination = ci->getTag(),
         arg::acceptMode  = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE,
         arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED,
         arg::exclusive   = ci->isExclusive(),
@@ -493,29 +535,32 @@ void UpdateClient::updateConsumer(
         arg::resumeTtl   = ci->getResumeTtl(),
         arg::arguments   = ci->getArguments()
     );
-    shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
-    shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
-    shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
+    shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
+    shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
+    shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getByteCredit());
     ClusterConnectionProxy(shadowSession).consumerState(
-        ci->getName(),
+        ci->getTag(),
         ci->isBlocked(),
         ci->isNotifyEnabled(),
         ci->position
     );
     consumerNumbering.add(ci.get());
 
-    QPID_LOG(debug, *this << " updated consumer " << ci->getName()
+    QPID_LOG(debug, *this << " updated consumer " << ci->getTag()
              << " on " << shadowSession.getId());
 }
-    
-void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
-    if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
+
+void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr,
+                                 client::AsyncSession& updateSession)
+{
+    if (!dr.isEnded() && dr.isAcquired()) {
+        assert(dr.getMessage().payload);
         // If the message is acquired then it is no longer on the
         // updatees queue, put it on the update queue for updatee to pick up.
         //
-        MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage());
+        MessageUpdater(UPDATE, updateSession, expiry).updateQueuedMessage(dr.getMessage());
     }
-    ClusterConnectionProxy(shadowSession).deliveryRecord(
+    ClusterConnectionProxy(updateSession).deliveryRecord(
         dr.getQueue()->getName(),
         dr.getMessage().position,
         dr.getTag(),
@@ -536,10 +581,12 @@ class TxOpUpdater : public broker::TxOpC
     TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry)
         : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {}
 
-    void operator()(const broker::DtxAck& ) {
-        throw InternalErrorException("DTX transactions not currently supported by cluster.");
+    void operator()(const broker::DtxAck& ack) {
+        std::for_each(ack.getPending().begin(), ack.getPending().end(),
+                      boost::bind(&UpdateClient::updateUnacked, &parent, _1, session));
+        proxy.dtxAck();
     }
-    
+
     void operator()(const broker::RecoveredDequeue& rdeq) {
         updateMessage(rdeq.getMessage());
         proxy.txEnqueue(rdeq.getQueue()->getName());
@@ -554,13 +601,18 @@ class TxOpUpdater : public broker::TxOpC
         proxy.txAccept(txAccept.getAcked());
     }
 
+    typedef std::list<Queue::shared_ptr> QueueList;
+
+    void copy(const QueueList& l, Array& a) {
+        for (QueueList::const_iterator i = l.begin(); i!=l.end(); ++i)
+            a.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
+    }
+
     void operator()(const broker::TxPublish& txPub) {
         updateMessage(txPub.getMessage());
-        typedef std::list<Queue::shared_ptr> QueueList;
-        const QueueList& qlist = txPub.getQueues();
+        assert(txPub.getQueues().empty() || txPub.getPrepared().empty());
         Array qarray(TYPE_CODE_STR8);
-        for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) 
-            qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
+        copy(txPub.getQueues().empty() ? txPub.getPrepared() : txPub.getQueues(), qarray);
         proxy.txPublish(qarray, txPub.delivered);
     }
 
@@ -569,18 +621,44 @@ class TxOpUpdater : public broker::TxOpC
     client::AsyncSession session;
     ClusterConnectionProxy proxy;
 };
-    
-void UpdateClient::updateTxState(broker::SemanticState& s) {
-    QPID_LOG(debug, *this << " updating TX transaction state.");
+
+void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended)
+{
+    ClusterConnectionProxy proxy(shadowSession);
+    broker::DtxWorkRecord* record =
+        updaterBroker.getDtxManager().getWork(dtx->getXid());
+    proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended);
+
+}
+
+void UpdateClient::updateTransactionState(broker::SemanticState& s) {
     ClusterConnectionProxy proxy(shadowSession);
     proxy.accumulatedAck(s.getAccumulatedAck());
-    broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
-    if (txBuffer) {
+    broker::TxBuffer::shared_ptr tx = s.getTxBuffer();
+    broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer();
+    if (dtx) {
+        updateBufferRef(dtx, false); // Current transaction.
+    } else if (tx) {
         proxy.txStart();
         TxOpUpdater updater(*this, shadowSession, expiry);
-        txBuffer->accept(updater);
+        tx->accept(updater);
         proxy.txEnd();
     }
+    for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin();
+         i != s.getSuspendedXids().end();
+         ++i)
+    {
+        updateBufferRef(i->second, true);
+    }
+}
+
+void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) {
+    ClusterConnectionProxy proxy(session);
+    proxy.dtxStart(
+        dtx->getXid(), dtx->isEnded(), dtx->isSuspended(), dtx->isFailed(), dtx->isExpired());
+    TxOpUpdater updater(*this, session, expiry);
+    dtx->accept(updater);
+    proxy.dtxEnd();
 }
 
 void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) {
@@ -615,4 +693,35 @@ void UpdateClient::updateBridge(const bo
     ClusterConnectionProxy(session).config(encode(*bridge));
 }
 
+void UpdateClient::updateQueueObservers(const boost::shared_ptr<broker::Queue>& q)
+{
+    q->eachObserver(boost::bind(&UpdateClient::updateObserver, this, q, _1));
+}
+
+void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q,
+                                        boost::shared_ptr<broker::QueueObserver> o)
+{
+    qpid::framing::FieldTable state;
+    broker::StatefulQueueObserver *so = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+    if (so) {
+        so->getState( state );
+        std::string id(so->getId());
+        QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s observer " << id);
+        ClusterConnectionProxy(session).queueObserverState( q->getName(), id, state );
+    }
+}
+
+void UpdateClient::updateDtxManager() {
+    broker::DtxManager& dtm = updaterBroker.getDtxManager();
+    dtm.each(boost::bind(&UpdateClient::updateDtxWorkRecord, this, _1));
+}
+
+void UpdateClient::updateDtxWorkRecord(const broker::DtxWorkRecord& r) {
+    QPID_LOG(debug, *this << " updating DTX transaction: " << r.getXid());
+    for (size_t i = 0; i < r.size(); ++i)
+        updateDtxBuffer(r[i]);
+    ClusterConnectionProxy(session).dtxWorkRecord(
+        r.getXid(), r.isPrepared(), r.getTimeout());
+}
+
 }} // namespace qpid::cluster

Propchange: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 20 18:42:46 2011
@@ -0,0 +1,3 @@
+/qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:1061302-1072333
+/qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:1144319-1179855
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:1072051-1185907

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.h Thu Oct 20 18:42:46 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,7 +34,7 @@
 
 namespace qpid {
 
-class Url;
+struct Url;
 
 namespace broker {
 
@@ -42,8 +42,8 @@ class Broker;
 class Queue;
 class Exchange;
 class QueueBindings;
-class QueueBinding;
-class QueuedMessage;
+struct QueueBinding;
+struct QueuedMessage;
 class SessionHandler;
 class DeliveryRecord;
 class SessionState;
@@ -51,7 +51,8 @@ class SemanticState;
 class Decoder;
 class Link;
 class Bridge;
-
+class QueueObserver;
+class DtxBuffer;
 } // namespace broker
 
 namespace cluster {
@@ -68,21 +69,26 @@ class ExpiryPolicy;
 class UpdateClient : public sys::Runnable {
   public:
     static const std::string UPDATE; // Name for special update queue and exchange.
+    static const std::string X_QPID_EXPIRATION; // Update message expiration
+    // Flag to remove props/headers that were added by the UpdateClient
+    static const std::string X_QPID_NO_MESSAGE_PROPS;
+    static const std::string X_QPID_NO_HEADERS;
+
     static client::Connection catchUpConnection();
-    
+
     UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
                  broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry,
                  const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&,
                  const boost::function<void()>& done,
                  const boost::function<void(const std::exception&)>& fail,
-                 const client::ConnectionSettings& 
+                 const client::ConnectionSettings&
     );
 
     ~UpdateClient();
     void update();
     void run();                 // Will delete this when finished.
 
-    void updateUnacked(const broker::DeliveryRecord&);
+    void updateUnacked(const broker::DeliveryRecord&, client::AsyncSession&);
 
   private:
     void updateQueue(client::AsyncSession&, const boost::shared_ptr<broker::Queue>&);
@@ -94,7 +100,8 @@ class UpdateClient : public sys::Runnabl
     void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding);
     void updateConnection(const boost::intrusive_ptr<Connection>& connection);
     void updateSession(broker::SessionHandler& s);
-    void updateTxState(broker::SemanticState& s);
+    void updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx, bool suspended);
+    void updateTransactionState(broker::SemanticState& s);
     void updateOutputTask(const sys::OutputTask* task);
     void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&);
     void updateQueueListeners(const boost::shared_ptr<broker::Queue>&);
@@ -104,6 +111,11 @@ class UpdateClient : public sys::Runnabl
     void updateLinks();
     void updateLink(const boost::shared_ptr<broker::Link>&);
     void updateBridge(const boost::shared_ptr<broker::Bridge>&);
+    void updateQueueObservers(const boost::shared_ptr<broker::Queue>&);
+    void updateObserver(const boost::shared_ptr<broker::Queue>&, boost::shared_ptr<broker::QueueObserver>);
+    void updateDtxManager();
+    void updateDtxBuffer(const boost::shared_ptr<broker::DtxBuffer>& );
+    void updateDtxWorkRecord(const broker::DtxWorkRecord&);
 
 
     Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering;

Propchange: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 20 18:42:46 2011
@@ -0,0 +1,2 @@
+/qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.h:1061302-1072333
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:1072051-1185907

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.cpp Thu Oct 20 18:42:46 2011
@@ -36,13 +36,8 @@ const std::string UpdateDataExchange::MA
 const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas");
 const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects");
 
-std::ostream& operator<<(std::ostream& o, const UpdateDataExchange& c) {
-    return o << "cluster(" << c.clusterId << " UPDATER)";
-}
-
 UpdateDataExchange::UpdateDataExchange(Cluster& cluster) :
-    Exchange(EXCHANGE_NAME, &cluster),
-    clusterId(cluster.getId())
+    Exchange(EXCHANGE_NAME, &cluster)
 {}
 
 void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey,
@@ -62,11 +57,9 @@ void UpdateDataExchange::updateManagemen
 
     framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size());
     agent->importAgents(buf1);
-    QPID_LOG(debug, *this << " updated management agents.");
 
     framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size());
     agent->importSchemas(buf2);
-    QPID_LOG(debug, *this << " updated management schemas.");
 
     using amqp_0_10::ListCodec;
     using types::Variant;
@@ -78,7 +71,6 @@ void UpdateDataExchange::updateManagemen
                               new management::ManagementAgent::DeletedObject(*i)));
     }
     agent->importDeletedObjects(objects);
-    QPID_LOG(debug, *this << " updated management deleted objects.");
 }
 
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org