You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/03/09 00:52:35 UTC

svn commit: r751557 - in /qpid/trunk/qpid/cpp: src/ src/qpid/cluster/ src/qpid/framing/ xml/

Author: aconway
Date: Sun Mar  8 23:52:35 2009
New Revision: 751557

URL: http://svn.apache.org/viewvc?rev=751557&view=rev
Log:
Fixed race conditions in cluster.

Execute all cluster logic in frameDeliverQueue thread,
decoding only in eventDeliverQueue thread.

Added:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h   (with props)
Removed:
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
Modified:
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
    qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Sun Mar  8 23:52:35 2009
@@ -40,6 +40,8 @@
   $(CMAN_SOURCES)				\
   qpid/cluster/Cluster.cpp			\
   qpid/cluster/Cluster.h			\
+  qpid/cluster/Decoder.cpp			\
+  qpid/cluster/Decoder.h			\
   qpid/cluster/PollableQueue.h			\
   qpid/cluster/ClusterMap.cpp			\
   qpid/cluster/ClusterMap.h			\
@@ -49,8 +51,6 @@
   qpid/cluster/Connection.h			\
   qpid/cluster/ConnectionCodec.cpp		\
   qpid/cluster/ConnectionCodec.h		\
-  qpid/cluster/ConnectionMap.h			\
-  qpid/cluster/ConnectionMap.cpp		\
   qpid/cluster/Cpg.cpp				\
   qpid/cluster/Cpg.h				\
   qpid/cluster/Dispatchable.h			\
@@ -65,6 +65,7 @@
   qpid/cluster/FailoverExchange.cpp		\
   qpid/cluster/FailoverExchange.h		\
   qpid/cluster/UpdateExchange.h			\
+  qpid/cluster/LockedConnectionMap.h		\
   qpid/cluster/Multicaster.cpp			\
   qpid/cluster/Multicaster.h			\
   qpid/cluster/McastFrameHandler.h		\

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Sun Mar  8 23:52:35 2009
@@ -107,11 +107,11 @@
                       boost::bind(&Cluster::leave, this),
                       "Error delivering frames",
                       poller),
-    connections(*this),
-    frameId(0),
     initialized(false),
+    decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
+    discarding(true),
     state(INIT),
-    eventId(0),
+    frameId(0),
     lastSize(0),
     lastBroker(false)
 {
@@ -156,14 +156,19 @@
 
 // Called in connection thread to insert a client connection.
 void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
-    connections.insert(c);
+    localConnections.insert(c);
 }
 
 // Called in connection thread to insert an updated shadow connection.
 void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
-    connections.insert(c);
+    // Safe to use connections here because we're pre-catchup, either
+    // discarding or stalled, so deliveredFrame is not processing any
+    // connection events.
+    assert(discarding);         
+    connections.insert(ConnectionMap::value_type(c->getId(), c));
 }
 
+// Called by Connection::deliverClose() in deliverFrameQueue thread.
 void Cluster::erase(const ConnectionId& id) {
     connections.erase(id);
 }
@@ -195,7 +200,6 @@
     if (state != LEFT) {
         state = LEFT;
         QPID_LOG(notice, *this << " leaving cluster " << name);
-        connections.clear();
         try { broker.shutdown(); }
         catch (const std::exception& e) {
             QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
@@ -217,58 +221,89 @@
     Event e(Event::decodeCopy(from, buf));
     if (from == self)  // Record self-deliveries for flow control.
         mcast.selfDeliver(e);
-    deliver(e);
+    deliverEvent(e);
 }
 
-void Cluster::deliver(const Event& e) {
+void Cluster::deliverEvent(const Event& e) {
     deliverEventQueue.push(e);
 }
 
+void Cluster::deliverFrame(const EventFrame& e) {
+    deliverFrameQueue.push(e);
+}
+
 // Handler for deliverEventQueue.
-// This thread executes cluster controls and decodes connection data events.
-void Cluster::deliveredEvent(const Event& event) {
-    Event e(event);
-    Mutex::ScopedLock l(lock);
-    if (state >= CATCHUP) {
-        e.setId(++eventId);
+// This thread decodes frames from events.
+void Cluster::deliveredEvent(const Event& e) {
         QPID_LOG(trace, *this << " DLVR: " << e);
-    }
-    if (e.isCluster()) {        // Cluster control, process in this thread.
+    if (e.isCluster()) {
         EventFrame ef(e, e.getFrame());
-        QPID_LOG(trace, *this << " DLVR:  " << ef);
-        ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l);
-        if (!framing::invoke(dispatch, *ef.frame.getBody()).wasHandled())
-            throw Exception(QPID_MSG("Invalid cluster control"));
-    }
-    else if (state >= CATCHUP) { // Handle connection frames  
-        if (e.getType() == CONTROL) 
-            connectionFrame(EventFrame(e, e.getFrame()));
+        // Stop the deliverEventQueue on update offers.
+        // This preserves the connection decoder fragments for an update.
+        ClusterUpdateOfferBody* offer = dynamic_cast<ClusterUpdateOfferBody*>(ef.frame.getBody());
+        if (offer)
+            deliverEventQueue.stop();
+        deliverFrame(ef);
+    }
+    else if(!discarding) {    
+        if (e.isControl())
+            deliverFrame(EventFrame(e, e.getFrame()));
         else
-            connections.decode(e, e.getData());
-    }
-    // Drop connection frames while state < CATCHUP
+            decoder.decode(e, e.getData());
 }
-
-void Cluster::connectionFrame(const EventFrame& frame) {
-    deliverFrameQueue.push(frame);
+    else // Discard connection events if discarding is set.
+        QPID_LOG(trace, *this << " DROP: " << e);
 }
 
 // Handler for deliverFrameQueue.
-// This thread executes connection control and data frames.
-void Cluster::deliveredFrame(const EventFrame& event) {
-    // No lock, only use connections, not Cluster state.
-    EventFrame e(event);
-    if(!e.frame.getBody()) {    // marks the stall point, start the update task.
-        updateThread=Thread(*updateTask);
+// This thread executes the main logic.
+void Cluster::deliveredFrame(const EventFrame& e) {
+    Mutex::ScopedLock l(lock);
+    if (e.isCluster()) {
+        QPID_LOG(trace, *this << " DLVR: " << e);
+        ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
+        if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
+            throw Exception(QPID_MSG("Invalid cluster control"));
     }
-    else {
+    else if (state >= CATCHUP) {
         QPID_LOG(trace, *this << " DLVR:  " << e);
-        if (e.type == DATA)         // Add cluster-id to to data frames.
-            e.frame.setClusterId(frameId++); 
-        boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
+        EventFrame ef(e);       // Non-const copy
+        if (ef.type == DATA)         // Add cluster-id to to data frames.
+            ef.frame.setClusterId(frameId++);  
+        ConnectionPtr connection = getConnection(e.connectionId, l);
         if (connection)
             connection->deliveredFrame(e);
     }
+    else // Drop connection frames while state < CATCHUP
+        QPID_LOG(trace, *this << " DROP: " << e);        
+}
+
+// Called in deliverFrameQueue thread
+ConnectionPtr Cluster::getConnection(const ConnectionId& id, Lock&) {
+    ConnectionPtr cp;
+    ConnectionMap::iterator i = connections.find(id);
+    if (i != connections.end())
+        cp = i->second;
+    else {
+        if(id.getMember() == self) 
+            cp = localConnections.getErase(id);
+        else {
+            // New remote connection, create a shadow.
+            std::ostringstream mgmtId;
+            mgmtId << id;
+            cp = new Connection(*this, shadowOut, mgmtId.str(), id);
+        }
+        if (cp)
+            connections.insert(ConnectionMap::value_type(id, cp));
+    }
+    return cp;
+}
+
+Cluster::ConnectionVector Cluster::getConnections(Lock&) {
+    ConnectionVector result(connections.size());
+    std::transform(connections.begin(), connections.end(), result.begin(),
+                   boost::bind(&ConnectionMap::value_type::second, _1));
+    return result;
 }
   
 struct AddrList {
@@ -316,7 +351,7 @@
     std::string addresses;
     for (cpg_address* p = current; p < current+nCurrent; ++p) 
         addresses.append(MemberId(*p).str());
-    deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
+    deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
 }
 
 void Cluster::setReady(Lock&) {
@@ -337,6 +372,7 @@
     if (state == INIT) {        // First configChange
         if (map.aliveCount() == 1) {
             setClusterId(true, l);
+            discarding = false;
             setReady(l);
             map = ClusterMap(self, myUrl, true);
             memberUpdate(l);
@@ -396,28 +432,18 @@
     }
 }
 
-void Cluster::stall(Lock&) {
-    // Stop processing the deliveredEventQueue in order to send or
-    // recieve an update.
-    deliverEventQueue.stop();
-}
-
-void Cluster::unstall(Lock&) {
-    // Stop processing the deliveredEventQueue in order to send or
-    // recieve an update.
-    deliverEventQueue.start();
-}
-
 void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
+    // NOTE: deliverEventQueue has been stopped at the update offer by
+    // deliveredEvent in case an update is required.
     if (state == LEFT) return;
     MemberId updatee(updateeInt);
     boost::optional<Url> url = map.updateOffer(updater, updatee);
     if (updater == self) {
         assert(state == OFFER);
-        if (url) {              // My offer was first.
+        if (url)               // My offer was first.
             updateStart(updatee, *url, l);
-        }
         else {                  // Another offer was first.
+            deliverEventQueue.start(); // Don't need to update
             setReady(l);
             QPID_LOG(info, *this << " cancelled update offer to " << updatee);
             makeOffer(map.firstJoiner(), l); // Maybe make another offer.
@@ -428,50 +454,48 @@
         setClusterId(uuid, l);
         state = UPDATEE;
         QPID_LOG(info, *this << " receiving update from " << updater);
-        stall(l);
         checkUpdateIn(l);
     }
+    else
+        deliverEventQueue.start(); // Don't need to update
 }
 
 void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
+    // NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent.
     if (state == LEFT) return;
     assert(state == OFFER);
     state = UPDATER;
-    QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
-    stall(l);
-
+    QPID_LOG(info, *this << " sending update to " << updatee << " at " << url);
     if (updateThread.id())
         updateThread.join(); // Join the previous updateThread to avoid leaks.
     client::ConnectionSettings cs;
     cs.username = settings.username;
     cs.password = settings.password;
     cs.mechanism = settings.mechanism;
-    updateTask = new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(),
+    updateThread = Thread(
+        new UpdateClient(self, updatee, url, broker, map, frameId, getConnections(l), decoder,
                          boost::bind(&Cluster::updateOutDone, this),
                          boost::bind(&Cluster::updateOutError, this, _1),
-                         cs);
-    // Push an empty frame onto the deliverFrameQueue to mark the stall point.
-    // The deliverFrameQueue thread will start the update at that point.
-    deliverFrameQueue.push(EventFrame(EventHeader(), AMQFrame()));
+                         cs));
 }
 
 // Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m, uint64_t eventId_, uint64_t frameId_) {
+void Cluster::updateInDone(const ClusterMap& m, uint64_t frameId_) {
     Lock l(lock);
     updatedMap = m;
-    eventId = eventId_;
-    // Safe to use frameId here because we are stalled: deliveredFrame cannot be called concurrently.
+    // Safe to set frameId here because we are stalled: deliveredFrame cannot be called concurrently.
     frameId = frameId_;
     checkUpdateIn(l);
 }
 
-void Cluster::checkUpdateIn(Lock& l) {
+void Cluster::checkUpdateIn(Lock&) {
     if (state == UPDATEE && updatedMap) {
         map = *updatedMap;
         mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
         state = CATCHUP;
+        discarding = false;     // ok to set, we're stalled for update.
         QPID_LOG(info, *this << " received update, starting catch-up");
-        unstall(l);
+        deliverEventQueue.start();
     }
 }
 
@@ -485,7 +509,7 @@
     assert(state == UPDATER);
     state = READY;
     mcast.release();
-    unstall(l);
+    deliverEventQueue.start();       // Start processing events again.
     makeOffer(map.firstJoiner(), l); // Try another offer
 }
 
@@ -569,15 +593,13 @@
         mgmtObject->set_memberIDs(idstr);
     }
 
-    // Generate a deliver-close control frame for connections
-    // belonging to defunct members, so they will be erased in the
-    // deliverFrameQueue thread.
-    ConnectionMap::Vector c = connections.values();
-    for (ConnectionMap::Vector::iterator i = c.begin(); i != c.end(); ++i) {
-        ConnectionId cid = (*i)->getId();
-        MemberId mid = cid.getMember();
-        if (mid != self && !map.isMember(mid))
-            connectionFrame(EventFrame(EventHeader(CONTROL, cid), AMQFrame(ClusterConnectionDeliverCloseBody())));
+    // Erase connections belonging to members that have left the cluster.
+    ConnectionMap::iterator i = connections.begin();
+    while (i != connections.end()) {
+        ConnectionMap::iterator j = i++;
+        MemberId m = j->second->getId().getMember();
+        if (m != self && !map.isMember(m))
+            connections.erase(j);
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Sun Mar  8 23:52:35 2009
@@ -19,33 +19,34 @@
  *
  */
 
-#include "ClusterSettings.h"
 #include "ClusterMap.h"
-#include "ConnectionMap.h"
+#include "ClusterSettings.h"
 #include "Cpg.h"
+#include "Decoder.h"
 #include "Event.h"
+#include "EventFrame.h"
+#include "ExpiryPolicy.h"
 #include "FailoverExchange.h"
+#include "LockedConnectionMap.h"
 #include "Multicaster.h"
-#include "EventFrame.h"
 #include "NoOpConnectionOutputHandler.h"
+#include "PollableQueue.h"
 #include "PollerDispatch.h"
 #include "Quorum.h"
-#include "PollableQueue.h"
-#include "ExpiryPolicy.h"
 
+#include "qmf/org/apache/qpid/cluster/Cluster.h"
+#include "qpid/Url.h"
 #include "qpid/broker/Broker.h"
-#include "qpid/sys/Monitor.h"
 #include "qpid/management/Manageable.h"
-#include "qpid/Url.h"
-#include "qmf/org/apache/qpid/cluster/Cluster.h"
+#include "qpid/sys/Monitor.h"
 
-#include <boost/intrusive_ptr.hpp>
 #include <boost/bind.hpp>
+#include <boost/intrusive_ptr.hpp>
 #include <boost/optional.hpp>
 
 #include <algorithm>
-#include <vector>
 #include <map>
+#include <vector>
 
 namespace qpid {
 
@@ -57,6 +58,7 @@
 namespace cluster {
 
 class Connection;
+class EventFrame;
 
 /**
  * Connection to the cluster
@@ -64,7 +66,7 @@
 class Cluster : private Cpg::Handler, public management::Manageable {
   public:
     typedef boost::intrusive_ptr<Connection> ConnectionPtr;
-    typedef std::vector<ConnectionPtr> Connections;
+    typedef std::vector<ConnectionPtr> ConnectionVector;
 
     // Public functions are thread safe unless otherwise mentioned in a comment.
 
@@ -90,7 +92,7 @@
     void leave();
 
     // Update completed - called in update thread
-    void updateInDone(const ClusterMap&, uint64_t eventId, uint64_t frameId);
+    void updateInDone(const ClusterMap&, uint64_t frameId);
 
     MemberId getId() const;
     broker::Broker& getBroker() const;
@@ -101,15 +103,19 @@
     size_t getReadMax() { return readMax; }
     size_t getWriteEstimate() { return writeEstimate; }
 
-    // Process a connection frame. Called by Connection with decoded frames.
-    // Thread safety: only called in deliverEventQueue thread.
-    void connectionFrame(const EventFrame&); 
+    void deliverFrame(const EventFrame&);
+
+    // Called only during update by Connection::shadowReady
+    Decoder& getDecoder() { return decoder; }
 
   private:
     typedef sys::Monitor::ScopedLock Lock;
 
     typedef PollableQueue<Event> PollableEventQueue;
     typedef PollableQueue<EventFrame> PollableFrameQueue;
+    typedef std::map<ConnectionId, ConnectionPtr> ConnectionMap;
+
+    // FIXME aconway 2009-03-07: sort functions by thread
 
     // NB: A dummy Lock& parameter marks functions that must only be
     // called with Cluster::lock  locked.
@@ -118,33 +124,33 @@
     std::vector<std::string> getIds(Lock&) const;
     std::vector<Url> getUrls(Lock&) const;
 
-    // Make an offer if we can - called in deliver thread.
-    void makeOffer(const MemberId&, Lock&);
-
-    // Called in main thread from Broker destructor.
+    // == Called in main thread from Broker destructor.
     void brokerShutdown();
 
+    // == Called in deliverEventQueue thread
+    void deliveredEvent(const Event&); 
+
+    // == Called in deliverFrameQueue thread
+    void deliveredFrame(const EventFrame&); 
+
     // Cluster controls implement XML methods from cluster.xml.
-    // Called in deliverEventQueue thread.
     void updateRequest(const MemberId&, const std::string&, Lock&);
     void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
     void configChange(const MemberId&, const std::string& addresses, Lock& l);
     void messageExpired(const MemberId&, uint64_t, Lock& l);
     void shutdown(const MemberId&, Lock&);
-    // Helper, called by updateOffer.
-    void updateStart(const MemberId& updatee, const Url& url, Lock&);
-
-    // Used by cluster controls.
-    void stall(Lock&);
-    void unstall(Lock&);
-
-    // Handlers for pollable queues.
-    void deliveredEvent(const Event&); 
-    void deliveredFrame(const EventFrame&); 
 
+    // Helper functions
+    ConnectionPtr getConnection(const ConnectionId&, Lock&);
+    ConnectionVector getConnections(Lock&);
+    void updateStart(const MemberId& updatee, const Url& url, Lock&);
+    void makeOffer(const MemberId&, Lock&);
     void setReady(Lock&);
+    void memberUpdate(Lock&);
+    void setClusterId(const framing::Uuid&, Lock&);
 
+    // == Called in CPG dispatch thread
     void deliver( // CPG deliver callback. 
         cpg_handle_t /*handle*/,
         struct cpg_name *group,
@@ -153,7 +159,7 @@
         void* /*msg*/,
         int /*msg_len*/);
 
-    void deliver(const Event&);
+    void deliverEvent(const Event&);
     
     void configChange( // CPG config change callback.
         cpg_handle_t /*handle*/,
@@ -163,23 +169,21 @@
         struct cpg_address */*joined*/, int /*nJoined*/
     );
 
+    // == Called in management threads.
     virtual qpid::management::ManagementObject* GetManagementObject() const;
     virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
 
     void stopClusterNode(Lock&);
     void stopFullCluster(Lock&);
-    void memberUpdate(Lock&);
 
-    // Called in connection IO threads .
+    // == Called in connection IO threads .
     void checkUpdateIn(Lock&);
 
-    // Called in UpdateClient thread.
+    // == Called in UpdateClient thread.
     void updateOutDone();
     void updateOutError(const std::exception&);
     void updateOutDone(Lock&);
 
-    void setClusterId(const framing::Uuid&, Lock&);
-
     // Immutable members set on construction, never changed.
     ClusterSettings settings;
     broker::Broker& broker;
@@ -203,17 +207,23 @@
     PollableFrameQueue deliverFrameQueue;
     boost::shared_ptr<FailoverExchange> failoverExchange;
     Quorum quorum;
-    ConnectionMap connections;
- 
-    // Used only in deliverFrameQueue thread or in deliverEventQueue thread when stalled.
-    uint64_t frameId;
+    LockedConnectionMap localConnections;
 
     // Used only during initialization
     bool initialized;
 
-    // Remaining members are protected by lock
+    // Used only in deliverEventQueue thread or when stalled for update.
+    Decoder decoder;
+    bool discarding;
+    
+    // Remaining members are protected by lock.
+    // FIXME aconway 2009-03-06: Most of these members are also only used in
+    // deliverFrameQueue thread or during stall. Review and separate members
+    // that require a lock, drop lock when not needed.
+    // 
     mutable sys::Monitor lock;
 
+
     //    Local cluster state, cluster map
     enum {
         INIT,    ///< Initial state, no CPG messages received.
@@ -226,13 +236,13 @@
         LEFT     ///< Final state, left the cluster.
     } state;
 
-    uint64_t eventId;
+    ConnectionMap connections;
+    uint64_t frameId;
     ClusterMap map;
     ClusterMap::Set elders;
     size_t lastSize;
     bool lastBroker;
     sys::Thread updateThread;
-    sys::Runnable* updateTask;
     boost::optional<ClusterMap> updatedMap;
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Sun Mar  8 23:52:35 2009
@@ -150,7 +150,8 @@
 void Connection::deliveredFrame(const EventFrame& f) {
     assert(!catchUp);
     currentChannel = f.frame.getChannel(); 
-    if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
+    if (f.frame.getBody()       // frame can be emtpy with just readCredit
+        && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
         && !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
     {
         if (f.type == DATA) // incoming data frames to broker::Connection
@@ -234,29 +235,6 @@
     return size;
 }
 
-// Decode a data event, a read buffer that has been delivered by the cluster.
-void Connection::decode(const EventHeader& eh, const void* data) {
-    assert(eh.getType() == DATA); // Only handle connection data events.
-    const char* cp = static_cast<const char*>(data);
-    Buffer buf(const_cast<char*>(cp), eh.getSize());
-    if (clusterDecoder.decode(buf)) {  // Decoded a frame
-        AMQFrame frame(clusterDecoder.getFrame());
-        while (clusterDecoder.decode(buf)) {
-            cluster.connectionFrame(EventFrame(eh, frame));
-            frame = clusterDecoder.getFrame();
-        }
-        // Set read-credit on the last frame ending in this event.
-        // Credit will be given when this frame is processed.
-        cluster.connectionFrame(EventFrame(eh, frame, 1)); 
-    }
-    else {
-        // We must give 1 unit read credit per event.
-        // This event does not complete any frames so 
-        // we give read credit directly.
-        giveReadCredit(1);
-    }    
-}
-
 broker::SessionState& Connection::sessionState() {
     return *connection.getChannel(currentChannel).getSession();
 }
@@ -297,12 +275,13 @@
     QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
     self = shadowId;
     connection.setUserId(username);
-    clusterDecoder.setFragment(fragment.data(), fragment.size());
+    // OK to use decoder here because we are stalled for update.
+    cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
 }
 
-void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t eventId, uint64_t frameId) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) {
     QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
-    cluster.updateInDone(ClusterMap(joiners, members), eventId, frameId);
+    cluster.updateInDone(ClusterMap(joiners, members), frameId);
     self.second = 0;        // Mark this as completed update connection.
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Sun Mar  8 23:52:35 2009
@@ -34,8 +34,8 @@
 #include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/framing/FrameDecoder.h"
 #include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/FrameDecoder.h"
 
 #include <iosfwd>
 
@@ -103,9 +103,6 @@
     // ConnectionCodec methods - called by IO layer with a read buffer.
     size_t decode(const char* buffer, size_t size);
 
-    // Decode a data event, a read buffer that has been delivered by the cluster.
-    void decode(const EventHeader& eh, const void* data);
-
     // Called for data delivered from the cluster.
     void deliveredFrame(const EventFrame&);
 
@@ -123,7 +120,7 @@
     
     void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
 
-    void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t eventId, uint64_t frameId);
+    void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId);
 
     void deliveryRecord(const std::string& queue,
                         const framing::SequenceNumber& position,
@@ -153,8 +150,6 @@
 
     void giveReadCredit(int credit);
 
-    framing::FrameDecoder& getDecoder() { return clusterDecoder; }
-
   private:
     struct NullFrameHandler : public framing::FrameHandler {
         void handle(framing::AMQFrame&) {}
@@ -179,7 +174,6 @@
     WriteEstimate writeEstimate;
     OutputInterceptor output;
     framing::FrameDecoder localDecoder;
-    framing::FrameDecoder clusterDecoder;
     broker::Connection connection;
     framing::SequenceNumber deliverSeq;
     framing::ChannelId currentChannel;

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp?rev=751557&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp Sun Mar  8 23:52:35 2009
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Decoder.h"
+#include "EventFrame.h"
+#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/AMQFrame.h"
+
+
+namespace qpid {
+namespace cluster {
+
+void Decoder::decode(const EventHeader& eh, const char* data) {
+    assert(eh.getType() == DATA); // Only handle connection data events.
+    const char* cp = static_cast<const char*>(data);
+    framing::Buffer buf(const_cast<char*>(cp), eh.getSize());
+    framing::FrameDecoder& decoder = map[eh.getConnectionId()];
+    if (decoder.decode(buf)) {  // Decoded a frame
+        framing::AMQFrame frame(decoder.getFrame());
+        while (decoder.decode(buf)) {
+            process(EventFrame(eh, frame));
+            frame = decoder.getFrame();
+        }
+        // Set read-credit on the last frame ending in this event.
+        // Credit will be given when this frame is processed.
+        process(EventFrame(eh, frame, 1)); 
+    }
+    else {
+        // We must give 1 unit read credit per event.
+        // This event does not complete any frames so 
+        // send an empty frame with the read credit.
+        process(EventFrame(EventHeader(), framing::AMQFrame(), 1));
+    }    
+}
+
+void Decoder::process(const EventFrame& ef) {
+    if (ef.frame.getMethod() && ef.frame.getMethod()->isA<framing::ClusterConnectionDeliverCloseBody>())
+        map.erase(ef.connectionId);
+    callback(ef);
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h?rev=751557&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h Sun Mar  8 23:52:35 2009
@@ -0,0 +1,57 @@
+#ifndef QPID_CLUSTER_DECODER_H
+#define QPID_CLUSTER_DECODER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/framing/FrameDecoder.h"
+#include <boost/function.hpp>
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+class EventFrame;
+class EventHeader;
+
+/**
+ * A map of decoders for connections.
+ */
+class Decoder
+{
+  public:
+    typedef boost::function<void(const EventFrame&)> FrameHandler;
+
+    Decoder(FrameHandler fh) : callback(fh) {}
+    void decode(const EventHeader& eh, const char* data);
+    void erase(const ConnectionId&);
+    framing::FrameDecoder& get(const ConnectionId& c) { return map[c]; }
+
+  private:
+    typedef std::map<ConnectionId, framing::FrameDecoder> Map;
+    Map map;
+    void process(const EventFrame&);
+    FrameHandler callback;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_DECODER_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Sun Mar  8 23:52:35 2009
@@ -44,7 +44,7 @@
     ;
 
 EventHeader::EventHeader(EventType t, const ConnectionId& c,  size_t s)
-    : type(t), connectionId(c), size(s), id(0) {}
+    : type(t), connectionId(c), size(s) {}
 
 
 Event::Event() {}
@@ -128,7 +128,7 @@
 }
 
 std::ostream& operator << (std::ostream& o, const EventHeader& e) {
-    o << "Event[id=" << e.getId() << " connection=" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]";
+    o << "Event[" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]";
     return o;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Sun Mar  8 23:52:35 2009
@@ -57,11 +57,9 @@
     /** Size of header + payload. */ 
     size_t getStoreSize() { return size + HEADER_SIZE; }
 
-    uint64_t getId() const { return id; }
-    void setId(uint64_t n) { id = n; }
-
     bool isCluster() const { return connectionId.getNumber() == 0; }
     bool isConnection() const { return connectionId.getNumber() != 0; }
+    bool isControl() const { return type == CONTROL; }
 
   protected:
     static const size_t HEADER_SIZE;
@@ -69,7 +67,6 @@
     EventType type;
     ConnectionId connectionId;
     size_t size;
-    uint64_t id;
 };
 
 /**

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp Sun Mar  8 23:52:35 2009
@@ -24,16 +24,16 @@
 namespace qpid {
 namespace cluster {
 
-EventFrame::EventFrame() : eventId(0) {}
+EventFrame::EventFrame() {}
 
 EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc)
-    : connectionId(e.getConnectionId()), frame(f), eventId(e.getId()), readCredit(rc), type(e.getType())
+    : connectionId(e.getConnectionId()), frame(f), readCredit(rc), type(e.getType())
 {
     QPID_LATENCY_INIT(frame);
 }
 
 std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
-    return o << e.frame << "(from event " << e.eventId << " read-credit=" << e.readCredit << ")";
+    return o << e.frame  << " " << e.type << " " << e.connectionId << " read-credit=" << e.readCredit;
 }
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Sun Mar  8 23:52:35 2009
@@ -47,16 +47,8 @@
     bool isLastInEvent() const { return readCredit; }
 
 
-    // True if this frame follows immediately after frame e. 
-    bool follows(const EventFrame& e) const {
-        return eventId == e.eventId || (eventId == e.eventId+1 && e.readCredit);
-    }
-
-    bool operator<(const EventFrame& e) const { return eventId < e.eventId; }
-    
     ConnectionId connectionId;
     framing::AMQFrame frame;   
-    uint64_t eventId;
     int readCredit; ///< last frame in an event, give credit when processed.
     EventType type;
 };

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h?rev=751557&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h Sun Mar  8 23:52:35 2009
@@ -0,0 +1,62 @@
+#ifndef QPID_CLUSTER_LOCKEDCONNECTIONMAP_H
+#define QPID_CLUSTER_LOCKEDCONNECTIONMAP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/sys/Mutex.h"
+#include "Connection.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Thread safe map of connections.
+ */
+class LockedConnectionMap
+{
+  public:
+    void insert(const ConnectionPtr& c) {
+        sys::Mutex::ScopedLock l(lock);
+        map[c->getId()] = c;
+    }
+    
+    ConnectionPtr getErase(const ConnectionId& c) {
+        sys::Mutex::ScopedLock l(lock);
+        Map::iterator i = map.find(c);
+        if (i != map.end()) {
+            ConnectionPtr cp = i->second;
+            map.erase(i);
+            return cp;
+        }
+        else
+            return 0;
+    }
+
+  private:
+    typedef std::map<ConnectionId, ConnectionPtr> Map;
+    mutable sys::Mutex lock;
+    Map map;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_LOCKEDCONNECTIONMAP_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Sun Mar  8 23:52:35 2009
@@ -22,6 +22,7 @@
 #include "Cluster.h"
 #include "ClusterMap.h"
 #include "Connection.h"
+#include "Decoder.h"
 #include "qpid/client/SessionBase_0_10Access.h" 
 #include "qpid/client/ConnectionAccess.h" 
 #include "qpid/broker/Broker.h"
@@ -86,14 +87,14 @@
 // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
 
 UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
-                           broker::Broker& broker, const ClusterMap& m, uint64_t eventId_, uint64_t frameId_, 
-                           const Cluster::Connections& cons,
+                           broker::Broker& broker, const ClusterMap& m, uint64_t frameId_, 
+                           const Cluster::ConnectionVector& cons, Decoder& decoder_,
                            const boost::function<void()>& ok,
                            const boost::function<void(const std::exception&)>& fail,
                            const client::ConnectionSettings& cs
 )
     : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
-      eventId(eventId_), frameId(frameId_), connections(cons), 
+      frameId(frameId_), connections(cons), decoder(decoder_),
       connection(catchUpConnection()), shadowConnection(catchUpConnection()),
       done(ok), failed(fail), connectionSettings(cs)
 {
@@ -130,7 +131,6 @@
 
     ClusterConnectionMembershipBody membership;
     map.toMethodBody(membership);
-    membership.setEventId(eventId);
     membership.setFrameId(frameId);
     AMQFrame frame(membership);
     client::ConnectionAccess::getImpl(connection)->handle(frame);
@@ -232,7 +232,8 @@
     connectionSettings.maxFrameSize = bc.getFrameMax();
     shadowConnection.open(updateeUrl, connectionSettings);
     bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
-    std::pair<const char*, size_t> fragment = updateConnection->getDecoder().getFragment();
+    // Safe to use decoder here because we are stalled for update.
+    std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment();
     ClusterConnectionProxy(shadowConnection).shadowReady(
         updateConnection->getId().getMember(),
         updateConnection->getId().getNumber(),

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Sun Mar  8 23:52:35 2009
@@ -46,6 +46,7 @@
 class DeliveryRecord;
 class SessionState;
 class SemanticState;
+class Decoder;
 
 } // namespace broker
 
@@ -54,6 +55,7 @@
 class Cluster;
 class Connection;
 class ClusterMap;
+class Decoder;
 
 /**
  * A client that updates the contents of a local broker to a remote one using AMQP.
@@ -63,8 +65,8 @@
     static const std::string UPDATE; // Name for special update queue and exchange.
     
     UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
-                 broker::Broker& donor, const ClusterMap& map, uint64_t eventId, uint64_t frameId,
-                 const std::vector<boost::intrusive_ptr<Connection> >& ,
+                 broker::Broker& donor, const ClusterMap& map, uint64_t frameId,
+                 const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&,
                  const boost::function<void()>& done,
                  const boost::function<void(const std::exception&)>& fail,
                  const client::ConnectionSettings& 
@@ -92,9 +94,9 @@
     Url updateeUrl;
     broker::Broker& updaterBroker;
     ClusterMap map;
-    uint64_t eventId;
     uint64_t frameId;
     std::vector<boost::intrusive_ptr<Connection> > connections;
+    Decoder& decoder;
     client::Connection connection, shadowConnection;
     client::AsyncSession session, shadowSession;
     boost::function<void()> done;

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp Sun Mar  8 23:52:35 2009
@@ -35,13 +35,14 @@
     subchannel=0;
     channel=0;
     encodedSizeCache = 0;
+    clusterId = 0;
 }
 
 AMQFrame::AMQFrame(const boost::intrusive_ptr<AMQBody>& b) : body(b) { init(); }
 
 AMQFrame::AMQFrame(const AMQBody& b) : body(b.clone()) { init(); }
 
-AMQFrame::~AMQFrame() {}
+AMQFrame::~AMQFrame() { init(); }
 
 AMQBody* AMQFrame::getBody() {
     // Non-const AMQBody* may be used to modify the body.

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Sun Mar  8 23:52:35 2009
@@ -132,7 +132,6 @@
     <control name="membership" code="0x21" label="Cluster membership details.">
       <field name="joiners" type="map"/> <!-- member-id -> URL -->
       <field name="members" type="map"/> <!-- member-id -> state -->
-      <field name="event-id" type="uint64"/>> <!-- Event id counter value -->
       <field name="frame-id" type="uint64"/>> <!-- Frame id counter value -->
     </control>
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org