You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC

svn commit: r1187150 [12/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/Cluster.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/Cluster.h Fri Oct 21 01:19:00 2011
@@ -56,25 +56,17 @@ namespace qpid {
 
 namespace broker {
 class Message;
-class AclModule;
 }
 
 namespace framing {
-class AMQFrame;
 class AMQBody;
-struct Uuid;
-}
-
-namespace sys {
-class Timer;
-class AbsTime;
-class Duration;
+class Uuid;
 }
 
 namespace cluster {
 
 class Connection;
-struct EventFrame;
+class EventFrame;
 class ClusterTimer;
 class UpdateDataExchange;
 
@@ -97,10 +89,10 @@ class Cluster : private Cpg::Handler, pu
     void initialize();
 
     // Connection map.
-    void addLocalConnection(const ConnectionPtr&);
-    void addShadowConnection(const ConnectionPtr&);
-    void erase(const ConnectionId&);
-
+    void addLocalConnection(const ConnectionPtr&); 
+    void addShadowConnection(const ConnectionPtr&); 
+    void erase(const ConnectionId&);       
+    
     // URLs of current cluster members.
     std::vector<std::string> getIds() const;
     std::vector<Url> getUrls() const;
@@ -115,7 +107,7 @@ class Cluster : private Cpg::Handler, pu
     void updateInRetracted();
     // True if we are expecting to receive catch-up connections.
     bool isExpectingUpdate();
-
+    
     MemberId getId() const;
     broker::Broker& getBroker() const;
     Multicaster& getMulticast() { return mcast; }
@@ -143,12 +135,6 @@ class Cluster : private Cpg::Handler, pu
     bool deferDeliveryImpl(const std::string& queue,
                            const boost::intrusive_ptr<broker::Message>& msg);
 
-    sys::AbsTime getClusterTime();
-    void sendClockUpdate();
-    void clock(const uint64_t time);
-
-    static bool loggable(const framing::AMQFrame&); // True if the frame should be logged.
-
   private:
     typedef sys::Monitor::ScopedLock Lock;
 
@@ -158,10 +144,10 @@ class Cluster : private Cpg::Handler, pu
 
     /** Version number of the cluster protocol, to avoid mixed versions. */
     static const uint32_t CLUSTER_VERSION;
-
+    
     // NB: A dummy Lock& parameter marks functions that must only be
     // called with Cluster::lock  locked.
-
+ 
     void leave(Lock&);
     std::vector<std::string> getIds(Lock&) const;
     std::vector<Url> getUrls(Lock&) const;
@@ -170,11 +156,11 @@ class Cluster : private Cpg::Handler, pu
     void brokerShutdown();
 
     // == Called in deliverEventQueue thread
-    void deliveredEvent(const Event&);
+    void deliveredEvent(const Event&); 
 
     // == Called in deliverFrameQueue thread
-    void deliveredFrame(const EventFrame&);
-    void processFrame(const EventFrame&, Lock&);
+    void deliveredFrame(const EventFrame&); 
+    void processFrame(const EventFrame&, Lock&); 
 
     // Cluster controls implement XML methods from cluster.xml.
     void updateRequest(const MemberId&, const std::string&, Lock&);
@@ -194,12 +180,12 @@ class Cluster : private Cpg::Handler, pu
                       const std::string& left,
                       const std::string& joined,
                       Lock& l);
+    void messageExpired(const MemberId&, uint64_t, Lock& l);
     void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
     void timerWakeup(const MemberId&, const std::string& name, Lock&);
     void timerDrop(const MemberId&, const std::string& name, Lock&);
     void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
     void deliverToQueue(const std::string& queue, const std::string& message, Lock&);
-    void clock(const uint64_t time, Lock&);
 
     // Helper functions
     ConnectionPtr getConnection(const EventFrame&, Lock&);
@@ -209,7 +195,7 @@ class Cluster : private Cpg::Handler, pu
     void setReady(Lock&);
     void memberUpdate(Lock&);
     void setClusterId(const framing::Uuid&, Lock&);
-    void erase(const ConnectionId&, Lock&);
+    void erase(const ConnectionId&, Lock&);       
     void requestUpdate(Lock& );
     void initMapCompleted(Lock&);
     void becomeElder(Lock&);
@@ -217,7 +203,7 @@ class Cluster : private Cpg::Handler, pu
     void updateMgmtMembership(Lock&);
 
     // == Called in CPG dispatch thread
-    void deliver( // CPG deliver callback.
+    void deliver( // CPG deliver callback. 
         cpg_handle_t /*handle*/,
         const struct cpg_name *group,
         uint32_t /*nodeid*/,
@@ -226,7 +212,7 @@ class Cluster : private Cpg::Handler, pu
         int /*msg_len*/);
 
     void deliverEvent(const Event&);
-
+    
     void configChange( // CPG config change callback.
         cpg_handle_t /*handle*/,
         const struct cpg_name */*group*/,
@@ -277,7 +263,7 @@ class Cluster : private Cpg::Handler, pu
     // Used only in deliverEventQueue thread or when stalled for update.
     Decoder decoder;
     bool discarding;
-
+    
 
     // Remaining members are protected by lock.
     mutable sys::Monitor lock;
@@ -290,7 +276,7 @@ class Cluster : private Cpg::Handler, pu
         JOINER,  ///< Sent update request, waiting for update offer.
         UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete.
         CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event.
-        READY,   ///< Fully operational
+        READY,   ///< Fully operational 
         OFFER,   ///< Sent an offer, waiting for accept/reject.
         UPDATER, ///< Offer accepted, sending a state update.
         LEFT     ///< Final state, left the cluster.
@@ -310,13 +296,9 @@ class Cluster : private Cpg::Handler, pu
     ErrorCheck error;
     UpdateReceiver updateReceiver;
     ClusterTimer* timer;
-    sys::Timer clockTimer;
-    sys::AbsTime clusterTime;
-    sys::Duration clusterTimeOffset;
-    broker::AclModule* acl;
 
   friend std::ostream& operator<<(std::ostream&, const Cluster&);
-  friend struct ClusterDispatcher;
+  friend class ClusterDispatcher;
 };
 
 }} // namespace qpid::cluster

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterMap.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterMap.cpp Fri Oct 21 01:19:00 2011
@@ -50,6 +50,11 @@ void insertFieldTableFromMapValue(FieldT
     ft.setString(vt.first.str(), vt.second.str());
 }
 
+void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) {
+    ft.clear();
+    for_each(map.begin(), map.end(), bind(&insertFieldTableFromMapValue, ref(ft), _1));
+}
+
 }
 
 ClusterMap::ClusterMap() : frameSeq(0) {}

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterPlugin.cpp Fri Oct 21 01:19:00 2011
@@ -72,7 +72,6 @@ struct ClusterOptions : public Options {
             ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
 #endif
             ("cluster-size", optValue(settings.size, "N"), "Wait for N cluster members before allowing clients to connect.")
-            ("cluster-clock-interval", optValue(settings.clockInterval,"N"), "How often to broadcast the current time to the cluster nodes, in milliseconds. A value between 5 and 1000 is recommended.")
             ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit  reads per connection. 0=no limit.")
             ;
     }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterSettings.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterSettings.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterSettings.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterSettings.h Fri Oct 21 01:19:00 2011
@@ -35,9 +35,8 @@ struct ClusterSettings {
     size_t readMax;
     std::string username, password, mechanism;
     size_t size;
-    uint16_t clockInterval;
 
-    ClusterSettings() : quorum(false), readMax(10), size(1), clockInterval(10)
+    ClusterSettings() : quorum(false), readMax(10), size(1)
     {}
   
     Url getUrl(uint16_t port) const {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterTimer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterTimer.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterTimer.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/ClusterTimer.cpp Fri Oct 21 01:19:00 2011
@@ -70,7 +70,6 @@ void ClusterTimer::add(intrusive_ptr<Tim
     if (i != map.end())
         throw Exception(QPID_MSG("Task already exists with name " << task->getName()));
     map[task->getName()] = task;
-
     // Only the elder actually activates the task with the Timer base class.
     if (cluster.isElder()) {
         QPID_LOG(trace, "Elder activating cluster timer task " << task->getName());
@@ -113,9 +112,6 @@ void ClusterTimer::deliverWakeup(const s
     else {
         intrusive_ptr<TimerTask> t = i->second;
         map.erase(i);
-        // Move the nextFireTime so readyToFire() is true. This is to ensure we
-        // don't get an error if the fired task calls setupNextFire()
-        t->setFired();
         Timer::fire(t);
     }
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/Connection.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/Connection.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,8 +24,6 @@
 #include "Cluster.h"
 #include "UpdateReceiver.h"
 #include "qpid/assert.h"
-#include "qpid/broker/DtxAck.h"
-#include "qpid/broker/DtxBuffer.h"
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/SemanticState.h"
 #include "qpid/broker/TxBuffer.h"
@@ -37,7 +35,6 @@
 #include "qpid/broker/Fairshare.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/Bridge.h"
-#include "qpid/broker/StatefulQueueObserver.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/AMQFrame.h"
@@ -81,7 +78,7 @@ const std::string shadowPrefix("[shadow]
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& mgmtId,
                        const ConnectionId& id, const qpid::sys::SecuritySettings& external)
-    : cluster(c), self(id), catchUp(false), announced(false), output(*this, out),
+    : cluster(c), self(id), catchUp(false), output(*this, out),
       connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true),
       expectProtocolHeader(false),
       mcastFrameHandler(cluster.getMulticast(), self),
@@ -93,15 +90,13 @@ Connection::Connection(Cluster& c, sys::
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& mgmtId, MemberId member,
                        bool isCatchUp, bool isLink, const qpid::sys::SecuritySettings& external
-) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), announced(false), output(*this, out),
+) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
     connectionCtor(&output, cluster.getBroker(),
                    mgmtId,
                    external,
                    isLink,
                    isCatchUp ? ++catchUpId : 0,
-                   // The first catch-up connection is not considered a shadow
-                   // as it needs to be authenticated.
-                   isCatchUp && self.second > 1),
+                   isCatchUp),  // isCatchUp => shadow
     expectProtocolHeader(isLink),
     mcastFrameHandler(cluster.getMulticast(), self),
     updateIn(c.getUpdateReceiver()),
@@ -118,7 +113,7 @@ Connection::Connection(Cluster& c, sys::
         if (!updateIn.nextShadowMgmtId.empty())
             connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
         updateIn.nextShadowMgmtId.clear();
-    }
+     }
     init();
     QPID_LOG(debug, cluster << " local connection " << *this);
 }
@@ -148,7 +143,7 @@ void Connection::init() {
 // Called when we have consumed a read buffer to give credit to the
 // connection layer to continue reading.
 void Connection::giveReadCredit(int credit) {
-    if (cluster.getSettings().readMax && credit)
+    if (cluster.getSettings().readMax && credit) 
         output.giveReadCredit(credit);
 }
 
@@ -171,7 +166,7 @@ void Connection::announce(
         AMQFrame frame;
         while (frame.decode(buf))
             connection->received(frame);
-        connection->setUserId(username);
+         connection->setUserId(username);
     }
     // Do managment actions now that the connection is replicated.
     connection->raiseConnectEvent();
@@ -198,7 +193,7 @@ void Connection::received(framing::AMQFr
                  << *this << ": " << f);
         return;
     }
-    QPID_LOG_IF(trace, Cluster::loggable(f), cluster << " RECV " << *this << ": " << f);
+    QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
     if (isLocal()) {            // Local catch-up connection.
         currentChannel = f.getChannel();
         if (!framing::invoke(*this, *f.getBody()).wasHandled())
@@ -206,7 +201,7 @@ void Connection::received(framing::AMQFr
     }
     else {             // Shadow or updated catch-up connection.
         if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
-            if (isShadow())
+            if (isShadow()) 
                 cluster.addShadowConnection(this);
             AMQFrame ok((ConnectionCloseOkBody()));
             connection->getOutput().send(ok);
@@ -218,9 +213,16 @@ void Connection::received(framing::AMQFr
     }
 }
 
-bool Connection::checkUnsupported(const AMQBody&) {
-    // Throw an exception for unsupported commands. Currently all are supported.
-    return false;
+bool Connection::checkUnsupported(const AMQBody& body) {
+    std::string message;
+    if (body.getMethod()) {
+        switch (body.getMethod()->amqpClassId()) {
+          case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
+        }
+    }
+    if (!message.empty())
+        connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message);
+    return !message.empty();
 }
 
 struct GiveReadCreditOnExit {
@@ -239,7 +241,7 @@ void Connection::deliverDoOutput(uint32_
 void Connection::deliveredFrame(const EventFrame& f) {
     GiveReadCreditOnExit gc(*this, f.readCredit);
     assert(!catchUp);
-    currentChannel = f.frame.getChannel();
+    currentChannel = f.frame.getChannel(); 
     if (f.frame.getBody()       // frame can be emtpy with just readCredit
         && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
         && !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
@@ -253,7 +255,7 @@ void Connection::deliveredFrame(const Ev
     }
 }
 
-// A local connection is closed by the network layer. Called in the connection thread.
+// A local connection is closed by the network layer.
 void Connection::closed() {
     try {
         if (isUpdated()) {
@@ -270,9 +272,8 @@ void Connection::closed() {
             // closed and process any outstanding frames from the cluster
             // until self-delivery of deliver-close.
             output.closeOutput();
-            if (announced)
-                cluster.getMulticast().mcastControl(
-                    ClusterConnectionDeliverCloseBody(), self);
+            cluster.getMulticast().mcastControl(
+                ClusterConnectionDeliverCloseBody(), self);
         }
     }
     catch (const std::exception& e) {
@@ -286,7 +287,7 @@ void Connection::deliverClose () {
     cluster.erase(self);
 }
 
-// Close the connection
+// Close the connection 
 void Connection::close() {
     if (connection.get()) {
         QPID_LOG(debug, cluster << " closed connection " << *this);
@@ -319,10 +320,10 @@ size_t Connection::decode(const char* da
         while (localDecoder.decode(buf))
             received(localDecoder.getFrame());
         if (!wasOpen && connection->isOpen()) {
-            // Connections marked with setUserProxyAuth are allowed to proxy
+            // Connections marked as federation links are allowed to proxy
             // messages with user-ID that doesn't match the connection's
             // authenticated ID. This is important for updates.
-            connection->setUserProxyAuth(isCatchUp());
+            connection->setFederationLink(isCatchUp());
         }
     }
     else {                      // Multicast local connections.
@@ -331,9 +332,9 @@ size_t Connection::decode(const char* da
         if (!checkProtocolHeader(ptr, size)) // Updates ptr
             return 0; // Incomplete header
 
-        if (!connection->isOpen())
+        if (!connection->isOpen()) 
             processInitialFrames(ptr, end-ptr); // Updates ptr
-
+        
         if (connection->isOpen() && end - ptr > 0) {
             // We're multi-casting, we will give read credit on delivery.
             grc.credit = 0;
@@ -383,7 +384,6 @@ void Connection::processInitialFrames(co
                 connection->getUserId(),
                 initialFrames),
             getId());
-        announced = true;
         initialFrames.clear();
     }
 }
@@ -406,11 +406,11 @@ void Connection::shadowSetUser(const std
 
 void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
 {
-    broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name);
-    c->position = position;
-    c->setBlocked(blocked);
-    if (notifyEnabled) c->enableNotify(); else c->disableNotify();
-    updateIn.consumerNumbering.add(c);
+    broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
+    c.position = position;
+    c.setBlocked(blocked);
+    if (notifyEnabled) c.enableNotify(); else c.disableNotify();
+    updateIn.consumerNumbering.add(c.shared_from_this());
 }
 
 
@@ -421,8 +421,7 @@ void Connection::sessionState(
     const SequenceNumber& expected,
     const SequenceNumber& received,
     const SequenceSet& unknownCompleted,
-    const SequenceSet& receivedIncomplete,
-    bool dtxSelected)
+    const SequenceSet& receivedIncomplete)
 {
     sessionState().setState(
         replayStart,
@@ -432,10 +431,8 @@ void Connection::sessionState(
         received,
         unknownCompleted,
         receivedIncomplete);
-    if (dtxSelected) semanticState().selectDtx();
-    QPID_LOG(debug, cluster << " received session state update for "
-             << sessionState().getId());
-    // The output tasks will be added later in the update process.
+    QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
+    // The output tasks will be added later in the update process. 
     connection->getOutputTasks().removeAll();
 }
 
@@ -444,7 +441,7 @@ void Connection::outputTask(uint16_t cha
     if (!session)
         throw Exception(QPID_MSG(cluster << " channel not attached " << *this
                                  << "[" <<  channel << "] "));
-    OutputTask* task = session->getSemanticState().find(name).get();
+    OutputTask* task = &session->getSemanticState().find(name);
     connection->getOutputTasks().addOutputTask(task);
 }
 
@@ -464,24 +461,11 @@ void Connection::shadowReady(
     output.setSendMax(sendMax);
 }
 
-void Connection::setDtxBuffer(const UpdateReceiver::DtxBufferRef& bufRef) {
-    broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
-    broker::DtxWorkRecord* record = mgr.getWork(bufRef.xid);
-    broker::DtxBuffer::shared_ptr buffer = (*record)[bufRef.index];
-    if (bufRef.suspended)
-        bufRef.semanticState->getSuspendedXids()[bufRef.xid] = buffer;
-    else
-        bufRef.semanticState->setDtxBuffer(buffer);
-}
-
-// Marks the end of the update.
 void Connection::membership(const FieldTable& joiners, const FieldTable& members,
                             const framing::SequenceNumber& frameSeq)
 {
     QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
     updateIn.consumerNumbering.clear();
-    for_each(updateIn.dtxBuffers.begin(), updateIn.dtxBuffers.end(),
-             boost::bind(&Connection::setDtxBuffer, this, _1));
     closeUpdated();
     cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
 }
@@ -494,7 +478,7 @@ void Connection::retractOffer() {
 
 void Connection::closeUpdated() {
     self.second = 0;      // Mark this as completed update connection.
-    if (connection.get())
+    if (connection.get()) 
         connection->close(connection::CLOSE_CODE_NORMAL, "OK");
 }
 
@@ -545,20 +529,12 @@ void Connection::deliveryRecord(const st
             m = getUpdateMessage();
             m.queue = queue.get();
             m.position = position;
-            if (enqueued) queue->updateEnqueued(m); //inform queue of the message
+            if (enqueued) queue->updateEnqueued(m); //inform queue of the message 
         } else {                // Message at original position in original queue
-            queue->find(position, m);
+            m = queue->find(position);
         }
-        // FIXME aconway 2011-08-19: removed:
-        // if (!m.payload)
-        //      throw Exception(QPID_MSG("deliveryRecord no update message"));
-        //
-        // It seems this could happen legitimately in the case one
-        // session browses message M, then another session acquires
-        // it. In that case the browsers delivery record is !acquired
-        // but the message is not on its original Queue. In that case
-        // we'll get a deliveryRecord with no payload for the browser.
-        //
+        if (!m.payload)
+            throw Exception(QPID_MSG("deliveryRecord no update message"));
     }
 
     broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
@@ -566,11 +542,7 @@ void Connection::deliveryRecord(const st
     if (cancelled) dr.cancel(dr.getTag());
     if (completed) dr.complete();
     if (ended) dr.setEnded();   // Exsitance of message
-
-    if (dtxBuffer)              // Record for next dtx-ack
-        dtxAckRecords.push_back(dr);
-    else
-        semanticState().record(dr); // Record on session's unacked list.
+    semanticState().record(dr); // Part of the session's unacked list.
 }
 
 void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
@@ -584,46 +556,8 @@ void Connection::queueFairshareState(con
     }
 }
 
-
-namespace {
-// find a StatefulQueueObserver that matches a given identifier
-class ObserverFinder {
-    const std::string id;
-    boost::shared_ptr<broker::QueueObserver> target;
-    ObserverFinder(const ObserverFinder&) {}
-  public:
-    ObserverFinder(const std::string& _id) : id(_id) {}
-    broker::StatefulQueueObserver *getObserver()
-    {
-        if (target)
-            return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
-        return 0;
-    }
-    void operator() (boost::shared_ptr<broker::QueueObserver> o)
-    {
-        if (!target) {
-            broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
-            if (p && p->getId() == id) {
-                target = o;
-            }
-        }
-    }
-};
-}
-
-
-void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state)
-{
-    boost::shared_ptr<broker::Queue> queue(findQueue(qname));
-    ObserverFinder finder(observerId);      // find this observer
-    queue->eachObserver<ObserverFinder &>(finder);
-    broker::StatefulQueueObserver *so = finder.getObserver();
-    if (so) {
-        so->setState( state );
-        QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ...");
-        return;
-    }
-    QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies.");
+void Connection::expiryId(uint64_t id) {
+    cluster.getExpiryPolicy().setId(id);
 }
 
 std::ostream& operator<<(std::ostream& o, const Connection& c) {
@@ -640,7 +574,6 @@ std::ostream& operator<<(std::ostream& o
 void Connection::txStart() {
     txBuffer.reset(new broker::TxBuffer());
 }
-
 void Connection::txAccept(const framing::SequenceSet& acked) {
     txBuffer->enlist(boost::shared_ptr<broker::TxAccept>(
                          new broker::TxAccept(acked, semanticState().getUnacked())));
@@ -656,11 +589,9 @@ void Connection::txEnqueue(const std::st
                          new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload)));
 }
 
-void Connection::txPublish(const framing::Array& queues, bool delivered)
-{
-    boost::shared_ptr<broker::TxPublish> txPub(
-        new broker::TxPublish(getUpdateMessage().payload));
-    for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
+void Connection::txPublish(const framing::Array& queues, bool delivered) {
+    boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload));
+    for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) 
         txPub->deliverTo(findQueue((*i)->get<std::string>()));
     txPub->delivered = delivered;
     txBuffer->enlist(txPub);
@@ -674,51 +605,6 @@ void Connection::accumulatedAck(const qp
     semanticState().setAccumulatedAck(s);
 }
 
-void Connection::dtxStart(const std::string& xid,
-                          bool ended,
-                          bool suspended,
-                          bool failed,
-                          bool expired)
-{
-    dtxBuffer.reset(new broker::DtxBuffer(xid, ended, suspended, failed, expired));
-    txBuffer = dtxBuffer;
-}
-
-void Connection::dtxEnd() {
-    broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
-    std::string xid = dtxBuffer->getXid();
-    if (mgr.exists(xid))
-        mgr.join(xid, dtxBuffer);
-    else
-        mgr.start(xid, dtxBuffer);
-    dtxBuffer.reset();
-    txBuffer.reset();
-}
-
-// Sent after all DeliveryRecords for a dtx-ack have been collected in dtxAckRecords
-void Connection::dtxAck() {
-    dtxBuffer->enlist(
-        boost::shared_ptr<broker::DtxAck>(new broker::DtxAck(dtxAckRecords)));
-    dtxAckRecords.clear();
-}
-
-void Connection::dtxBufferRef(const std::string& xid, uint32_t index, bool suspended) {
-    // Save the association between DtxBuffers and the session so we
-    // can set the DtxBuffers at the end of the update when the
-    // DtxManager has been replicated.
-    updateIn.dtxBuffers.push_back(
-        UpdateReceiver::DtxBufferRef(xid, index, suspended, &semanticState()));
-}
-
-// Sent at end of work record.
-void Connection::dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout)
-{
-    broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
-    if (timeout) mgr.setTimeout(xid, timeout);
-    if (prepared) mgr.prepare(xid);
-}
-
-
 void Connection::exchange(const std::string& encoded) {
     Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
     broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);
@@ -728,6 +614,12 @@ void Connection::exchange(const std::str
     QPID_LOG(debug, cluster << " updated exchange " << ex->getName());
 }
 
+void Connection::queue(const std::string& encoded) {
+    Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+    broker::Queue::shared_ptr q = broker::Queue::decode(cluster.getBroker().getQueues(), buf);
+    QPID_LOG(debug, cluster << " updated queue " << q->getName());
+}
+
 void Connection::sessionError(uint16_t , const std::string& msg) {
     // Ignore errors before isOpen(), we're not multicasting yet.
     if (connection->isOpen())
@@ -786,23 +678,6 @@ void Connection::config(const std::strin
     else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
 }
 
-void Connection::doCatchupIoCallbacks() {
-    // We need to process IO callbacks during the catch-up phase in
-    // order to service asynchronous completions for messages
-    // transferred during catch-up.
-
-    if (catchUp) getBrokerConnection()->doIoCallbacks();
-}
-
-void Connection::clock(uint64_t time) {
-    QPID_LOG(debug, "Cluster connection received time update");
-    cluster.clock(time);
-}
-
-void Connection::queueDequeueSincePurgeState(const std::string& qname, uint32_t dequeueSincePurge) {
-    boost::shared_ptr<broker::Queue> queue(findQueue(qname));
-    queue->setDequeueSincePurge(dequeueSincePurge);
-}
 
 }} // Namespace qpid::cluster
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/Connection.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/Connection.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,12 +24,11 @@
 
 #include "types.h"
 #include "OutputInterceptor.h"
+#include "EventFrame.h"
 #include "McastFrameHandler.h"
 #include "UpdateReceiver.h"
 
-#include "qpid/RefCounted.h"
 #include "qpid/broker/Connection.h"
-#include "qpid/broker/DeliveryRecord.h"
 #include "qpid/broker/SecureConnection.h"
 #include "qpid/broker/SemanticState.h"
 #include "qpid/amqp_0_10/Connection.h"
@@ -48,7 +47,7 @@ namespace framing { class AMQFrame; }
 
 namespace broker {
 class SemanticState;
-struct QueuedMessage;
+class QueuedMessage;
 class TxBuffer;
 class TxAccept;
 }
@@ -56,7 +55,6 @@ class TxAccept;
 namespace cluster {
 class Cluster;
 class Event;
-struct EventFrame;
 
 /** Intercept broker::Connection calls for shadow and local cluster connections. */
 class Connection :
@@ -64,7 +62,7 @@ class Connection :
         public sys::ConnectionInputHandler,
         public framing::AMQP_AllOperations::ClusterConnectionHandler,
         private broker::Connection::ErrorListener
-
+        
 {
   public:
 
@@ -75,7 +73,7 @@ class Connection :
     Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id,
                const qpid::sys::SecuritySettings& external);
     ~Connection();
-
+    
     ConnectionId getId() const { return self; }
     broker::Connection* getBrokerConnection() { return connection.get(); }
     const broker::Connection* getBrokerConnection() const { return connection.get(); }
@@ -110,9 +108,9 @@ class Connection :
     void deliveredFrame(const EventFrame&);
 
     void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position);
-
+    
     // ==== Used in catch-up mode to build initial state.
-    //
+    // 
     // State update methods.
     void shadowPrepare(const std::string&);
 
@@ -124,11 +122,10 @@ class Connection :
                       const framing::SequenceNumber& expected,
                       const framing::SequenceNumber& received,
                       const framing::SequenceSet& unknownCompleted,
-                      const SequenceSet& receivedIncomplete,
-                      bool dtxSelected);
-
+                      const SequenceSet& receivedIncomplete);
+    
     void outputTask(uint16_t channel, const std::string& name);
-
+    
     void shadowReady(uint64_t memberId,
                      uint64_t connectionId,
                      const std::string& managementId,
@@ -156,7 +153,7 @@ class Connection :
 
     void queuePosition(const std::string&, const framing::SequenceNumber&);
     void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
-    void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&);
+    void expiryId(uint64_t);
 
     void txStart();
     void txAccept(const framing::SequenceSet&);
@@ -166,18 +163,8 @@ class Connection :
     void txEnd();
     void accumulatedAck(const framing::SequenceSet&);
 
-    // Dtx state
-    void dtxStart(const std::string& xid,
-                  bool ended,
-                  bool suspended,
-                  bool failed,
-                  bool expired);
-    void dtxEnd();
-    void dtxAck();
-    void dtxBufferRef(const std::string& xid, uint32_t index, bool suspended);
-    void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout);
-
-    // Encoded exchange replication.
+    // Encoded queue/exchange replication.
+    void queue(const std::string& encoded);
     void exchange(const std::string& encoded);
 
     void giveReadCredit(int credit);
@@ -202,12 +189,6 @@ class Connection :
 
     void setSecureConnection ( broker::SecureConnection * sc );
 
-    void doCatchupIoCallbacks();
-
-    void clock(uint64_t time);
-
-    void queueDequeueSincePurgeState(const std::string&, uint32_t);
-
   private:
     struct NullFrameHandler : public framing::FrameHandler {
         void handle(framing::AMQFrame&) {}
@@ -252,7 +233,7 @@ class Connection :
     // Error listener functions
     void connectionError(const std::string&);
     void sessionError(uint16_t channel, const std::string&);
-
+    
     void init();
     bool checkUnsupported(const framing::AMQBody& body);
     void deliverDoOutput(uint32_t limit);
@@ -264,11 +245,10 @@ class Connection :
     broker::SemanticState& semanticState();
     broker::QueuedMessage getUpdateMessage();
     void closeUpdated();
-    void setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &);
+
     Cluster& cluster;
     ConnectionId self;
     bool catchUp;
-    bool announced;
     OutputInterceptor output;
     framing::FrameDecoder localDecoder;
     ConnectionCtor connectionCtor;
@@ -276,9 +256,6 @@ class Connection :
     framing::SequenceNumber deliverSeq;
     framing::ChannelId currentChannel;
     boost::shared_ptr<broker::TxBuffer> txBuffer;
-    boost::shared_ptr<broker::DtxBuffer> dtxBuffer;
-    broker::DeliveryRecords dtxAckRecords;
-    broker::DtxWorkRecord* dtxCurrent;
     bool expectProtocolHeader;
     McastFrameHandler mcastFrameHandler;
     UpdateReceiver& updateIn;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/Decoder.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/Decoder.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/Decoder.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/Decoder.h Fri Oct 21 01:19:00 2011
@@ -31,7 +31,7 @@
 namespace qpid {
 namespace cluster {
 
-struct EventFrame;
+class EventFrame;
 class EventHeader;
 
 /**

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/ErrorCheck.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/ErrorCheck.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/ErrorCheck.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/ErrorCheck.h Fri Oct 21 01:19:00 2011
@@ -33,7 +33,7 @@
 namespace qpid {
 namespace cluster {
 
-struct EventFrame;
+class EventFrame;
 class Cluster;
 class Multicaster;
 class Connection;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/Event.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/Event.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,7 +23,6 @@
 #include "qpid/cluster/Cpg.h"
 #include "qpid/framing/Buffer.h"
 #include "qpid/framing/AMQFrame.h"
-#include "qpid/RefCountedBuffer.h"
 #include "qpid/assert.h"
 #include <ostream>
 #include <iterator>

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/Event.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/Event.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,7 +23,7 @@
  */
 
 #include "qpid/cluster/types.h"
-#include "qpid/BufferRef.h"
+#include "qpid/RefCountedBuffer.h"
 #include "qpid/framing/AMQFrame.h"
 #include <sys/uio.h>            // For iovec
 #include <iosfwd>
@@ -53,7 +53,7 @@ class EventHeader {
 
     /** Size of payload data, excluding header. */
     size_t getSize() const { return size; }
-    /** Size of header + payload. */
+    /** Size of header + payload. */ 
     size_t getStoreSize() const { return size + HEADER_SIZE; }
 
     bool isCluster() const { return connectionId.getNumber() == 0; }
@@ -62,7 +62,7 @@ class EventHeader {
 
   protected:
     static const size_t HEADER_SIZE;
-
+    
     EventType type;
     ConnectionId connectionId;
     size_t size;
@@ -86,25 +86,25 @@ class Event : public EventHeader {
 
     /** Create a control event. */
     static Event control(const framing::AMQFrame&, const ConnectionId&);
-
+    
     // Data excluding header.
-    char* getData() { return store.begin() + HEADER_SIZE; }
-    const char* getData() const { return store.begin() + HEADER_SIZE; }
+    char* getData() { return store + HEADER_SIZE; }
+    const char* getData() const { return store + HEADER_SIZE; }
 
     // Store including header
-    char* getStore() { return store.begin(); }
-    const char* getStore() const { return store.begin(); }
-
-    const framing::AMQFrame& getFrame() const;
+    char* getStore() { return store; }
+    const char* getStore() const { return store; }
 
+    const framing::AMQFrame& getFrame() const;        
+    
     operator framing::Buffer() const;
 
     iovec toIovec() const;
-
+    
   private:
     void encodeHeader() const;
 
-    BufferRef store;
+    RefCountedBuffer::pointer store;
     mutable framing::AMQFrame frame;
 };
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/EventFrame.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/EventFrame.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/EventFrame.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,7 +48,7 @@ struct EventFrame
 
 
     ConnectionId connectionId;
-    framing::AMQFrame frame;
+    framing::AMQFrame frame;   
     int readCredit; ///< last frame in an event, give credit when processed.
     EventType type;
 };

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/ExpiryPolicy.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/ExpiryPolicy.cpp Fri Oct 21 01:19:00 2011
@@ -21,21 +21,106 @@
 
 #include "qpid/broker/Message.h"
 #include "qpid/cluster/ExpiryPolicy.h"
-#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/Multicaster.h"
+#include "qpid/framing/ClusterMessageExpiredBody.h"
 #include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace cluster {
 
-ExpiryPolicy::ExpiryPolicy(Cluster& cluster) : cluster(cluster) {}
+ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t)
+    : expiryId(1), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
 
+struct ExpiryTask : public sys::TimerTask {
+    ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
+        : TimerTask(when,"ExpiryPolicy"), expiryPolicy(policy), expiryId(id) {}
+    void fire() { expiryPolicy->sendExpire(expiryId); }
+    boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+    const uint64_t expiryId;
+};
+
+// Called while receiving an update
+void ExpiryPolicy::setId(uint64_t id) {
+    sys::Mutex::ScopedLock l(lock);
+    expiryId = id;
+}
+
+// Called while giving an update
+uint64_t ExpiryPolicy::getId() const {
+    sys::Mutex::ScopedLock l(lock);
+    return expiryId;
+}
+
+// Called in enqueuing connection thread
+void ExpiryPolicy::willExpire(broker::Message& m) {
+    uint64_t id;
+    {
+        // When messages are fanned out to multiple queues, update sends
+        // them as independenty messages so we can have multiple messages
+        // with the same expiry ID.
+        //
+        sys::Mutex::ScopedLock l(lock);
+        id = expiryId++;
+        if (!id) {              // This is an update of an already-expired message.
+            m.setExpiryPolicy(expiredPolicy);
+        }
+        else {
+            assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
+            // If this is an update, the id may already exist
+            unexpiredById.insert(IdMessageMap::value_type(id, &m));
+            unexpiredByMessage[&m] = id;
+        }
+    }
+    timer.add(new ExpiryTask(this, id, m.getExpiration()));
+}
+
+// Called in dequeueing connection thread
+void ExpiryPolicy::forget(broker::Message& m) {
+    sys::Mutex::ScopedLock l(lock);
+    MessageIdMap::iterator i = unexpiredByMessage.find(&m);
+    assert(i != unexpiredByMessage.end());
+    unexpiredById.erase(i->second);
+    unexpiredByMessage.erase(i);
+}
+
+// Called in dequeueing connection or cleanup thread.
 bool ExpiryPolicy::hasExpired(broker::Message& m) {
-    return m.getExpiration() < cluster.getClusterTime();
+    sys::Mutex::ScopedLock l(lock);
+    return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
+}
+
+// Called in timer thread
+void ExpiryPolicy::sendExpire(uint64_t id) {
+    {
+        sys::Mutex::ScopedLock l(lock);
+        // Don't multicast an expiry notice if message is already forgotten.
+        if (unexpiredById.find(id) == unexpiredById.end()) return;
+    }
+    mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
 }
 
-sys::AbsTime ExpiryPolicy::getCurrentTime() {
-    return cluster.getClusterTime();
+// Called in CPG deliver thread.
+void ExpiryPolicy::deliverExpire(uint64_t id) {
+    sys::Mutex::ScopedLock l(lock);
+    std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id);
+    IdMessageMap::iterator i = expired.first;
+    while (i != expired.second) {
+        i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true; 
+        unexpiredByMessage.erase(i->second);
+        unexpiredById.erase(i++);
+    }
 }
 
+// Called in update thread on the updater.
+boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
+    sys::Mutex::ScopedLock l(lock);
+    MessageIdMap::iterator i = unexpiredByMessage.find(&m);
+    return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
+}
+
+bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }
+void ExpiryPolicy::Expired::willExpire(broker::Message&) { }
+
 }} // namespace qpid::cluster

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/ExpiryPolicy.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/ExpiryPolicy.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/ExpiryPolicy.h Fri Oct 21 01:19:00 2011
@@ -36,8 +36,12 @@ namespace broker {
 class Message;
 }
 
+namespace sys {
+class Timer;
+}
+
 namespace cluster {
-class Cluster;
+class Multicaster;
 
 /**
  * Cluster expiry policy
@@ -45,13 +49,43 @@ class Cluster;
 class ExpiryPolicy : public broker::ExpiryPolicy
 {
   public:
-    ExpiryPolicy(Cluster& cluster);
+    ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&);
 
+    void willExpire(broker::Message&);
     bool hasExpired(broker::Message&);
-    qpid::sys::AbsTime getCurrentTime();
+    void forget(broker::Message&);
+
+    // Send expiration notice to cluster.
+    void sendExpire(uint64_t);
+
+    // Cluster delivers expiry notice.
+    void deliverExpire(uint64_t);
 
+    void setId(uint64_t id);
+    uint64_t getId() const;
+    
+    boost::optional<uint64_t> getId(broker::Message&);
+    
   private:
-    Cluster& cluster;
+    typedef std::map<broker::Message*,  uint64_t> MessageIdMap;
+    // When messages are fanned out to multiple queues, update sends
+    // them as independenty messages so we can have multiple messages
+    // with the same expiry ID.
+    typedef std::multimap<uint64_t, broker::Message*> IdMessageMap;
+
+    struct Expired : public broker::ExpiryPolicy {
+        bool hasExpired(broker::Message&);
+        void willExpire(broker::Message&);
+    };
+
+    mutable sys::Mutex lock;
+    MessageIdMap unexpiredByMessage;
+    IdMessageMap unexpiredById;
+    uint64_t expiryId;
+    boost::intrusive_ptr<Expired> expiredPolicy;
+    Multicaster& mcast;
+    MemberId memberId;
+    sys::Timer& timer;
 };
 
 }} // namespace qpid::cluster

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/FailoverExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/FailoverExchange.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/FailoverExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/FailoverExchange.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,10 +39,8 @@ using namespace broker;
 using namespace framing;
 
 const string FailoverExchange::typeName("amq.failover");
-
-FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b)
-    : Exchange(typeName, parent, b ), ready(false)
-{
+    
+FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b)  : Exchange(typeName, parent, b ) {
     if (mgmtExchange != 0)
         mgmtExchange->set_type(typeName);
 }
@@ -55,17 +53,16 @@ void FailoverExchange::setUrls(const vec
 void FailoverExchange::updateUrls(const vector<Url>& u) {
     Lock l(lock);
     urls=u;
-    if (ready && !urls.empty()) {
-        std::for_each(queues.begin(), queues.end(),
-                      boost::bind(&FailoverExchange::sendUpdate, this, _1));
-    }
+    if (urls.empty()) return;
+    std::for_each(queues.begin(), queues.end(),
+                  boost::bind(&FailoverExchange::sendUpdate, this, _1));
 }
 
 string FailoverExchange::getType() const { return typeName; }
 
 bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) {
     Lock l(lock);
-    if (ready) sendUpdate(queue);
+    sendUpdate(queue);
     return queues.insert(queue).second;
 }
 
@@ -87,7 +84,7 @@ void FailoverExchange::sendUpdate(const 
     // Called with lock held.
     if (urls.empty()) return;
     framing::Array array(0x95);
-    for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
+    for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) 
         array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
     const ProtocolVersion v;
     boost::intrusive_ptr<Message> msg(new Message);
@@ -99,12 +96,9 @@ void FailoverExchange::sendUpdate(const 
     header.get<MessageProperties>(true)->getApplicationHeaders().setArray(typeName, array);
     AMQFrame headerFrame(header);
     headerFrame.setFirstSegment(false);
-    msg->getFrames().append(headerFrame);
+    msg->getFrames().append(headerFrame);    
     DeliverableMessage(msg).deliverTo(queue);
 }
 
-void FailoverExchange::setReady() {
-    ready = true;
-}
 
 }} // namespace cluster

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/FailoverExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/FailoverExchange.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/FailoverExchange.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/FailoverExchange.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -46,8 +46,6 @@ class FailoverExchange : public broker::
     void setUrls(const std::vector<Url>&);
     /** Set the URLs and send an update.*/
     void updateUrls(const std::vector<Url>&);
-    /** Flag the failover exchange as ready to generate updates (caught up) */
-    void setReady();
 
     // Exchange overrides
     std::string getType() const;
@@ -58,7 +56,7 @@ class FailoverExchange : public broker::
 
   private:
     void sendUpdate(const boost::shared_ptr<broker::Queue>&);
-
+    
     typedef sys::Mutex::ScopedLock Lock;
     typedef std::vector<Url> Urls;
     typedef std::set<boost::shared_ptr<broker::Queue> > Queues;
@@ -66,7 +64,7 @@ class FailoverExchange : public broker::
     sys::Mutex lock;
     Urls urls;
     Queues queues;
-    bool ready;
+    
 };
 }} // namespace qpid::cluster
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/Multicaster.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/Multicaster.cpp Fri Oct 21 01:19:00 2011
@@ -21,7 +21,6 @@
 
 #include "qpid/cluster/Multicaster.h"
 #include "qpid/cluster/Cpg.h"
-#include "qpid/cluster/Cluster.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/AMQBody.h"
 #include "qpid/framing/AMQFrame.h"
@@ -59,7 +58,7 @@ void Multicaster::mcast(const Event& e) 
             return;
         }
     }
-    QPID_LOG_IF(trace, e.isControl() && Cluster::loggable(e.getFrame()), "MCAST " << e);
+    QPID_LOG(trace, "MCAST " << e);
     if (bypass) {               // direct, don't queue
         iovec iov = e.toIovec();
         while (!cpg.mcast(&iov, 1))

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/OutputInterceptor.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -45,11 +45,12 @@ void OutputInterceptor::send(framing::AM
 }
 
 void OutputInterceptor::activateOutput() {
-    sys::Mutex::ScopedLock l(lock);
-    if (parent.isCatchUp())
+    if (parent.isCatchUp()) {
+        sys::Mutex::ScopedLock l(lock);
         next->activateOutput();
+    }
     else
-        sendDoOutput(sendMax, l);
+        sendDoOutput(sendMax);
 }
 
 void OutputInterceptor::abort() {
@@ -65,38 +66,29 @@ void OutputInterceptor::giveReadCredit(i
 }
 
 // Called in write thread when the IO layer has no more data to write.
-// We only process IO callbacks in the write thread during catch-up.
-// Normally we run doOutput only on delivery of doOutput requests.
-bool OutputInterceptor::doOutput() {
-    parent.doCatchupIoCallbacks();
-    return false;
-}
+// We do nothing in the write thread, we run doOutput only on delivery
+// of doOutput requests.
+bool OutputInterceptor::doOutput() { return false; }
 
-// Send output up to limit, calculate new limit.
+// Send output up to limit, calculate new limit. 
 void OutputInterceptor::deliverDoOutput(uint32_t limit) {
-    sys::Mutex::ScopedLock l(lock);
     sentDoOutput = false;
     sendMax = limit;
     size_t newLimit = limit;
     if (parent.isLocal()) {
-        size_t buffered = next->getBuffered();
+        size_t buffered = getBuffered();
         if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit.
-            newLimit = sendMax*2;
+            newLimit = sendMax*2; 
         else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit.
             newLimit = (sendMax + sent) / 2;
     }
     sent = 0;
-    while (sent < limit) {
-        {
-            sys::Mutex::ScopedUnlock u(lock);
-            if (!parent.getBrokerConnection()->doOutput()) break;
-        }
+    while (sent < limit && parent.getBrokerConnection()->doOutput())
         ++sent;
-    }
-    if (sent == limit) sendDoOutput(newLimit, l);
+    if (sent == limit) sendDoOutput(newLimit);
 }
 
-void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&) {
+void OutputInterceptor::sendDoOutput(size_t newLimit) {
     if (parent.isLocal() && !sentDoOutput && !closing) {
         sentDoOutput = true;
         parent.getCluster().getMulticast().mcastControl(
@@ -105,7 +97,6 @@ void OutputInterceptor::sendDoOutput(siz
     }
 }
 
-// Called in connection thread when local connection closes.
 void OutputInterceptor::closeOutput() {
     sys::Mutex::ScopedLock l(lock);
     closing = true;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/OutputInterceptor.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/OutputInterceptor.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/OutputInterceptor.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/OutputInterceptor.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -58,13 +58,13 @@ class OutputInterceptor : public sys::Co
 
     uint32_t getSendMax() const { return sendMax; }
     void setSendMax(uint32_t sendMax_) { sendMax=sendMax_; }
-
+    
     cluster::Connection& parent;
-
+    
   private:
     typedef sys::Mutex::ScopedLock Locker;
 
-    void sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&);
+    void sendDoOutput(size_t newLimit);
 
     mutable sys::Mutex lock;
     bool closing;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/SecureConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/SecureConnectionFactory.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/SecureConnectionFactory.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/SecureConnectionFactory.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,7 +48,7 @@ SecureConnectionFactory::create(Protocol
     if (clusterCodec) {
         SecureConnectionPtr sc(new SecureConnection());
         clusterCodec->setSecureConnection(sc.get());
-        sc->setCodec(codec);
+        sc->setCodec(codec);        
         return sc.release();
     }
     return 0;
@@ -63,7 +63,7 @@ SecureConnectionFactory::create(sys::Out
     if (clusterCodec) {
         SecureConnectionPtr sc(new SecureConnection());
         clusterCodec->setSecureConnection(sc.get());
-        sc->setCodec(codec);
+        sc->setCodec(codec);        
         return sc.release();
     }
     return 0;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,9 +26,9 @@
 #include "qpid/cluster/Decoder.h"
 #include "qpid/cluster/ExpiryPolicy.h"
 #include "qpid/cluster/UpdateDataExchange.h"
-#include "qpid/client/SessionBase_0_10Access.h"
-#include "qpid/client/ConnectionAccess.h"
-#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h" 
+#include "qpid/client/ConnectionAccess.h" 
+#include "qpid/client/SessionImpl.h" 
 #include "qpid/client/ConnectionImpl.h"
 #include "qpid/client/Future.h"
 #include "qpid/broker/Broker.h"
@@ -45,13 +45,10 @@
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/TxOpVisitor.h"
 #include "qpid/broker/DtxAck.h"
-#include "qpid/broker/DtxBuffer.h"
-#include "qpid/broker/DtxWorkRecord.h"
 #include "qpid/broker/TxAccept.h"
 #include "qpid/broker/TxPublish.h"
 #include "qpid/broker/RecoveredDequeue.h"
 #include "qpid/broker/RecoveredEnqueue.h"
-#include "qpid/broker/StatefulQueueObserver.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/ClusterConnectionMembershipBody.h"
 #include "qpid/framing/ClusterConnectionShadowReadyBody.h"
@@ -67,7 +64,6 @@
 #include <boost/bind.hpp>
 #include <boost/cast.hpp>
 #include <algorithm>
-#include <iterator>
 #include <sstream>
 
 namespace qpid {
@@ -86,20 +82,11 @@ using namespace framing;
 namespace arg=client::arg;
 using client::SessionBase_0_10Access;
 
-// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
-const std::string UpdateClient::UPDATE("x-qpid.cluster-update");
-// Name for header used to carry expiration information.
-const std::string UpdateClient::X_QPID_EXPIRATION = "x-qpid.expiration";
-// Headers used to flag headers/properties added by the UpdateClient so they can be
-// removed on the other side.
-const std::string UpdateClient::X_QPID_NO_MESSAGE_PROPS = "x-qpid.no-message-props";
-const std::string UpdateClient::X_QPID_NO_HEADERS = "x-qpid.no-headers";
-
 std::ostream& operator<<(std::ostream& o, const UpdateClient& c) {
     return o << "cluster(" << c.updaterId << " UPDATER)";
 }
 
-struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler
+struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler 
 {
     boost::shared_ptr<qpid::client::ConnectionImpl> connection;
 
@@ -133,7 +120,7 @@ void send(client::AsyncSession& s, const
 // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
 
 UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
-                           broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_,
+                           broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_, 
                            const Cluster::ConnectionVector& cons, Decoder& decoder_,
                            const boost::function<void()>& ok,
                            const boost::function<void(const std::exception&)>& fail,
@@ -147,11 +134,13 @@ UpdateClient::UpdateClient(const MemberI
 
 UpdateClient::~UpdateClient() {}
 
+// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
+const std::string UpdateClient::UPDATE("qpid.cluster-update");
+
 void UpdateClient::run() {
     try {
         connection.open(updateeUrl, connectionSettings);
         session = connection.newSession(UPDATE);
-        session.sync();
         update();
         done();
     } catch (const std::exception& e) {
@@ -165,13 +154,6 @@ void UpdateClient::update() {
              << " at " << updateeUrl);
     Broker& b = updaterBroker;
 
-    if(b.getExpiryPolicy()) {
-        QPID_LOG(debug, *this << "Updating updatee with cluster time");
-        qpid::sys::AbsTime clusterTime = b.getExpiryPolicy()->getCurrentTime();
-        int64_t time = qpid::sys::Duration(qpid::sys::EPOCH, clusterTime);
-        ClusterConnectionProxy(session).clock(time);
-    }
-
     updateManagementSetupState();
 
     b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
@@ -181,20 +163,16 @@ void UpdateClient::update() {
     // longer on their original queue.
     session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
     session.sync();
-
     std::for_each(connections.begin(), connections.end(),
                   boost::bind(&UpdateClient::updateConnection, this, _1));
-
-    // some Queue Observers need session state & msgs synced first, so sync observers now
-    b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1));
+    session.queueDelete(arg::queue=UPDATE);
 
     // Update queue listeners: must come after sessions so consumerNumbering is populated
     b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
 
+    ClusterConnectionProxy(session).expiryId(expiry.getId());
     updateLinks();
     updateManagementAgent();
-    updateDtxManager();
-    session.queueDelete(arg::queue=UPDATE);
 
     session.close();
 
@@ -206,7 +184,7 @@ void UpdateClient::update() {
 
     // NOTE: connection will be closed from the other end, don't close
     // it here as that causes a race.
-
+    
     // TODO aconway 2010-03-15: This sleep avoids the race condition
     // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
     // It allows the connection to fully close before destroying the
@@ -298,7 +276,7 @@ class MessageUpdater {
     framing::SequenceNumber lastPos;
     client::AsyncSession session;
     ExpiryPolicy& expiry;
-
+    
   public:
 
     MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) {
@@ -315,6 +293,7 @@ class MessageUpdater {
         }
     }
 
+
     void updateQueuedMessage(const broker::QueuedMessage& message) {
         // Send the queue position if necessary.
         if (!haveLastPos || message.position - lastPos != 1)  {
@@ -323,23 +302,10 @@ class MessageUpdater {
         }
         lastPos = message.position;
 
-        // if the ttl > 0, we need to send the calculated expiration time to the updatee
-        const DeliveryProperties* dprops =
-            message.payload->getProperties<DeliveryProperties>();
-        if (dprops && dprops->getTtl() > 0) {
-            bool hadMessageProps =
-                message.payload->hasProperties<framing::MessageProperties>();
-            const framing::MessageProperties* mprops =
-                message.payload->getProperties<framing::MessageProperties>();
-            bool hadApplicationHeaders = mprops->hasApplicationHeaders();
-            message.payload->insertCustomProperty(UpdateClient::X_QPID_EXPIRATION,
-                            sys::Duration(sys::EPOCH, message.payload->getExpiration()));
-            // If message properties or application headers didn't exist
-            // prior to us adding data, we want to remove them on the other side.
-            if (!hadMessageProps)
-                message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0);
-            else if (!hadApplicationHeaders)
-                message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_HEADERS, 0);
+        // Send the expiry ID if necessary.
+        if (message.payload->getProperties<DeliveryProperties>()->getTtl()) {
+            boost::optional<uint64_t> expiryId = expiry.getId(*message.payload);
+            ClusterConnectionProxy(session).expiryId(expiryId?*expiryId:0);
         }
 
         // We can't send a broker::Message via the normal client API,
@@ -352,7 +318,7 @@ class MessageUpdater {
         framing::MessageTransferBody transfer(
             *message.payload->getFrames().as<framing::MessageTransferBody>());
         transfer.setDestination(UpdateClient::UPDATE);
-
+        
         sb.get()->send(transfer, message.payload->getFrames(),
                        !message.payload->isContentReleased());
         if (message.payload->isContentReleased()){
@@ -360,10 +326,9 @@ class MessageUpdater {
             uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
             bool morecontent = true;
             for (uint64_t offset = 0; morecontent; offset += maxContentSize)
-            {
+            {            
                 AMQFrame frame((AMQContentBody()));
-                morecontent = message.payload->getContentFrame(
-                    *(message.queue), frame, maxContentSize, offset);
+                morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset);
                 sb.get()->sendRawFrame(frame);
             }
         }
@@ -392,8 +357,6 @@ void UpdateClient::updateQueue(client::A
     if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) {
         ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count);
     }
-
-    ClusterConnectionProxy(s).queueDequeueSincePurgeState(q->getName(), q->getDequeueSincePurge());
 }
 
 void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) {
@@ -409,11 +372,7 @@ void UpdateClient::updateNonExclusiveQue
 }
 
 void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& queue, const QueueBinding& binding) {
-    if (binding.exchange.size())
-        s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
-    //else its the default exchange and there is no need to replicate
-    //the binding, the creation of the queue will have done so
-    //automatically
+    s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
 }
 
 void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
@@ -421,8 +380,8 @@ void UpdateClient::updateOutputTask(cons
         boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task);
     SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci);
     uint16_t channel =  ci->getParent().getSession().getChannel();
-    ClusterConnectionProxy(shadowConnection).outputTask(channel,  ci->getTag());
-    QPID_LOG(debug, *this << " updating output task " << ci->getTag()
+    ClusterConnectionProxy(shadowConnection).outputTask(channel,  ci->getName());
+    QPID_LOG(debug, *this << " updating output task " << ci->getName()
              << " channel=" << channel);
 }
 
@@ -430,7 +389,7 @@ void UpdateClient::updateConnection(cons
     QPID_LOG(debug, *this << " updating connection " << *updateConnection);
     assert(updateConnection->getBrokerConnection());
     broker::Connection& bc = *updateConnection->getBrokerConnection();
-
+    
     // Send the management ID first on the main connection.
     std::string mgmtId = updateConnection->getBrokerConnection()->getMgmtId();
     ClusterConnectionProxy(session).shadowPrepare(mgmtId);
@@ -467,7 +426,7 @@ void UpdateClient::updateSession(broker:
 
     QPID_LOG(debug, *this << " updating session " << ss->getId());
 
-    // Create a client session to update session state.
+    // Create a client session to update session state. 
     boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
     boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel());
     simpl->disableAutoDetach();
@@ -486,19 +445,19 @@ void UpdateClient::updateSession(broker:
     QPID_LOG(debug, *this << " updating unacknowledged messages.");
     broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
     std::for_each(drs.begin(), drs.end(),
-                  boost::bind(&UpdateClient::updateUnacked, this, _1, shadowSession));
+                  boost::bind(&UpdateClient::updateUnacked, this, _1));
 
-    updateTransactionState(ss->getSemanticState());
+    updateTxState(ss->getSemanticState());           // Tx transaction state.
 
     // Adjust command counter for message in progress, will be sent after state update.
     boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
     SequenceNumber received = ss->receiverGetReceived().command;
-    if (inProgress)
+    if (inProgress)  
         --received;
 
     // Sync the session to ensure all responses from broker have been processed.
     shadowSession.sync();
-
+    
     // Reset command-sequence state.
     proxy.sessionState(
         ss->senderGetReplayPoint().command,
@@ -507,8 +466,7 @@ void UpdateClient::updateSession(broker:
         std::max(received, ss->receiverGetExpected().command),
         received,
         ss->receiverGetUnknownComplete(),
-        ss->receiverGetIncomplete(),
-        ss->getSemanticState().getDtxSelected()
+        ss->receiverGetIncomplete()
     );
 
     // Send frames for partial message in progress.
@@ -521,13 +479,13 @@ void UpdateClient::updateSession(broker:
 void UpdateClient::updateConsumer(
     const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
 {
-    QPID_LOG(debug, *this << " updating consumer " << ci->getTag() << " on "
+    QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on "
              << shadowSession.getId());
 
     using namespace message;
     shadowSession.messageSubscribe(
         arg::queue       = ci->getQueue()->getName(),
-        arg::destination = ci->getTag(),
+        arg::destination = ci->getName(),
         arg::acceptMode  = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE,
         arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED,
         arg::exclusive   = ci->isExclusive(),
@@ -535,32 +493,29 @@ void UpdateClient::updateConsumer(
         arg::resumeTtl   = ci->getResumeTtl(),
         arg::arguments   = ci->getArguments()
     );
-    shadowSession.messageSetFlowMode(ci->getTag(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
-    shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
-    shadowSession.messageFlow(ci->getTag(), CREDIT_UNIT_BYTE, ci->getByteCredit());
+    shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
+    shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());
+    shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit());
     ClusterConnectionProxy(shadowSession).consumerState(
-        ci->getTag(),
+        ci->getName(),
         ci->isBlocked(),
         ci->isNotifyEnabled(),
         ci->position
     );
     consumerNumbering.add(ci.get());
 
-    QPID_LOG(debug, *this << " updated consumer " << ci->getTag()
+    QPID_LOG(debug, *this << " updated consumer " << ci->getName()
              << " on " << shadowSession.getId());
 }
-
-void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr,
-                                 client::AsyncSession& updateSession)
-{
-    if (!dr.isEnded() && dr.isAcquired()) {
-        assert(dr.getMessage().payload);
+    
+void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
+    if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
         // If the message is acquired then it is no longer on the
         // updatees queue, put it on the update queue for updatee to pick up.
         //
-        MessageUpdater(UPDATE, updateSession, expiry).updateQueuedMessage(dr.getMessage());
+        MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage());
     }
-    ClusterConnectionProxy(updateSession).deliveryRecord(
+    ClusterConnectionProxy(shadowSession).deliveryRecord(
         dr.getQueue()->getName(),
         dr.getMessage().position,
         dr.getTag(),
@@ -581,12 +536,10 @@ class TxOpUpdater : public broker::TxOpC
     TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry)
         : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {}
 
-    void operator()(const broker::DtxAck& ack) {
-        std::for_each(ack.getPending().begin(), ack.getPending().end(),
-                      boost::bind(&UpdateClient::updateUnacked, &parent, _1, session));
-        proxy.dtxAck();
+    void operator()(const broker::DtxAck& ) {
+        throw InternalErrorException("DTX transactions not currently supported by cluster.");
     }
-
+    
     void operator()(const broker::RecoveredDequeue& rdeq) {
         updateMessage(rdeq.getMessage());
         proxy.txEnqueue(rdeq.getQueue()->getName());
@@ -601,18 +554,13 @@ class TxOpUpdater : public broker::TxOpC
         proxy.txAccept(txAccept.getAcked());
     }
 
-    typedef std::list<Queue::shared_ptr> QueueList;
-
-    void copy(const QueueList& l, Array& a) {
-        for (QueueList::const_iterator i = l.begin(); i!=l.end(); ++i)
-            a.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
-    }
-
     void operator()(const broker::TxPublish& txPub) {
         updateMessage(txPub.getMessage());
-        assert(txPub.getQueues().empty() || txPub.getPrepared().empty());
+        typedef std::list<Queue::shared_ptr> QueueList;
+        const QueueList& qlist = txPub.getQueues();
         Array qarray(TYPE_CODE_STR8);
-        copy(txPub.getQueues().empty() ? txPub.getPrepared() : txPub.getQueues(), qarray);
+        for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) 
+            qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
         proxy.txPublish(qarray, txPub.delivered);
     }
 
@@ -621,44 +569,18 @@ class TxOpUpdater : public broker::TxOpC
     client::AsyncSession session;
     ClusterConnectionProxy proxy;
 };
-
-void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended)
-{
-    ClusterConnectionProxy proxy(shadowSession);
-    broker::DtxWorkRecord* record =
-        updaterBroker.getDtxManager().getWork(dtx->getXid());
-    proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended);
-
-}
-
-void UpdateClient::updateTransactionState(broker::SemanticState& s) {
+    
+void UpdateClient::updateTxState(broker::SemanticState& s) {
+    QPID_LOG(debug, *this << " updating TX transaction state.");
     ClusterConnectionProxy proxy(shadowSession);
     proxy.accumulatedAck(s.getAccumulatedAck());
-    broker::TxBuffer::shared_ptr tx = s.getTxBuffer();
-    broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer();
-    if (dtx) {
-        updateBufferRef(dtx, false); // Current transaction.
-    } else if (tx) {
+    broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
+    if (txBuffer) {
         proxy.txStart();
         TxOpUpdater updater(*this, shadowSession, expiry);
-        tx->accept(updater);
+        txBuffer->accept(updater);
         proxy.txEnd();
     }
-    for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin();
-         i != s.getSuspendedXids().end();
-         ++i)
-    {
-        updateBufferRef(i->second, true);
-    }
-}
-
-void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) {
-    ClusterConnectionProxy proxy(session);
-    proxy.dtxStart(
-        dtx->getXid(), dtx->isEnded(), dtx->isSuspended(), dtx->isFailed(), dtx->isExpired());
-    TxOpUpdater updater(*this, session, expiry);
-    dtx->accept(updater);
-    proxy.dtxEnd();
 }
 
 void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) {
@@ -693,35 +615,4 @@ void UpdateClient::updateBridge(const bo
     ClusterConnectionProxy(session).config(encode(*bridge));
 }
 
-void UpdateClient::updateQueueObservers(const boost::shared_ptr<broker::Queue>& q)
-{
-    q->eachObserver(boost::bind(&UpdateClient::updateObserver, this, q, _1));
-}
-
-void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q,
-                                        boost::shared_ptr<broker::QueueObserver> o)
-{
-    qpid::framing::FieldTable state;
-    broker::StatefulQueueObserver *so = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
-    if (so) {
-        so->getState( state );
-        std::string id(so->getId());
-        QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s observer " << id);
-        ClusterConnectionProxy(session).queueObserverState( q->getName(), id, state );
-    }
-}
-
-void UpdateClient::updateDtxManager() {
-    broker::DtxManager& dtm = updaterBroker.getDtxManager();
-    dtm.each(boost::bind(&UpdateClient::updateDtxWorkRecord, this, _1));
-}
-
-void UpdateClient::updateDtxWorkRecord(const broker::DtxWorkRecord& r) {
-    QPID_LOG(debug, *this << " updating DTX transaction: " << r.getXid());
-    for (size_t i = 0; i < r.size(); ++i)
-        updateDtxBuffer(r[i]);
-    ClusterConnectionProxy(session).dtxWorkRecord(
-        r.getXid(), r.isPrepared(), r.getTimeout());
-}
-
 }} // namespace qpid::cluster

Propchange: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 01:19:00 2011
@@ -1,3 +0,0 @@
-/qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:1061302-1072333
-/qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:1144319-1179855
-/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:1072051-1185907

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.h Fri Oct 21 01:19:00 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,7 +34,7 @@
 
 namespace qpid {
 
-struct Url;
+class Url;
 
 namespace broker {
 
@@ -42,8 +42,8 @@ class Broker;
 class Queue;
 class Exchange;
 class QueueBindings;
-struct QueueBinding;
-struct QueuedMessage;
+class QueueBinding;
+class QueuedMessage;
 class SessionHandler;
 class DeliveryRecord;
 class SessionState;
@@ -51,8 +51,7 @@ class SemanticState;
 class Decoder;
 class Link;
 class Bridge;
-class QueueObserver;
-class DtxBuffer;
+
 } // namespace broker
 
 namespace cluster {
@@ -69,26 +68,21 @@ class ExpiryPolicy;
 class UpdateClient : public sys::Runnable {
   public:
     static const std::string UPDATE; // Name for special update queue and exchange.
-    static const std::string X_QPID_EXPIRATION; // Update message expiration
-    // Flag to remove props/headers that were added by the UpdateClient
-    static const std::string X_QPID_NO_MESSAGE_PROPS;
-    static const std::string X_QPID_NO_HEADERS;
-
     static client::Connection catchUpConnection();
-
+    
     UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
                  broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry,
                  const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&,
                  const boost::function<void()>& done,
                  const boost::function<void(const std::exception&)>& fail,
-                 const client::ConnectionSettings&
+                 const client::ConnectionSettings& 
     );
 
     ~UpdateClient();
     void update();
     void run();                 // Will delete this when finished.
 
-    void updateUnacked(const broker::DeliveryRecord&, client::AsyncSession&);
+    void updateUnacked(const broker::DeliveryRecord&);
 
   private:
     void updateQueue(client::AsyncSession&, const boost::shared_ptr<broker::Queue>&);
@@ -100,8 +94,7 @@ class UpdateClient : public sys::Runnabl
     void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding);
     void updateConnection(const boost::intrusive_ptr<Connection>& connection);
     void updateSession(broker::SessionHandler& s);
-    void updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx, bool suspended);
-    void updateTransactionState(broker::SemanticState& s);
+    void updateTxState(broker::SemanticState& s);
     void updateOutputTask(const sys::OutputTask* task);
     void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&);
     void updateQueueListeners(const boost::shared_ptr<broker::Queue>&);
@@ -111,11 +104,6 @@ class UpdateClient : public sys::Runnabl
     void updateLinks();
     void updateLink(const boost::shared_ptr<broker::Link>&);
     void updateBridge(const boost::shared_ptr<broker::Bridge>&);
-    void updateQueueObservers(const boost::shared_ptr<broker::Queue>&);
-    void updateObserver(const boost::shared_ptr<broker::Queue>&, boost::shared_ptr<broker::QueueObserver>);
-    void updateDtxManager();
-    void updateDtxBuffer(const boost::shared_ptr<broker::DtxBuffer>& );
-    void updateDtxWorkRecord(const broker::DtxWorkRecord&);
 
 
     Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering;

Propchange: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateClient.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 01:19:00 2011
@@ -1,2 +0,0 @@
-/qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.h:1061302-1072333
-/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:1072051-1185907

Modified: qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/cluster/UpdateDataExchange.cpp Fri Oct 21 01:19:00 2011
@@ -36,8 +36,13 @@ const std::string UpdateDataExchange::MA
 const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas");
 const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects");
 
+std::ostream& operator<<(std::ostream& o, const UpdateDataExchange& c) {
+    return o << "cluster(" << c.clusterId << " UPDATER)";
+}
+
 UpdateDataExchange::UpdateDataExchange(Cluster& cluster) :
-    Exchange(EXCHANGE_NAME, &cluster)
+    Exchange(EXCHANGE_NAME, &cluster),
+    clusterId(cluster.getId())
 {}
 
 void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey,
@@ -57,9 +62,11 @@ void UpdateDataExchange::updateManagemen
 
     framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size());
     agent->importAgents(buf1);
+    QPID_LOG(debug, *this << " updated management agents.");
 
     framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size());
     agent->importSchemas(buf2);
+    QPID_LOG(debug, *this << " updated management schemas.");
 
     using amqp_0_10::ListCodec;
     using types::Variant;
@@ -71,6 +78,7 @@ void UpdateDataExchange::updateManagemen
                               new management::ManagementAgent::DeletedObject(*i)));
     }
     agent->importDeletedObjects(objects);
+    QPID_LOG(debug, *this << " updated management deleted objects.");
 }
 
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org