You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/03/03 00:30:09 UTC

svn commit: r749473 - in /qpid/trunk/qpid/cpp: src/ src/qpid/cluster/ src/qpid/framing/ src/tests/ xml/

Author: aconway
Date: Mon Mar  2 23:30:08 2009
New Revision: 749473

URL: http://svn.apache.org/viewvc?rev=749473&view=rev
Log:

Replicate connection decoder fragments to new members.

Refactoring:
 - Merge Decoder into ConnectionMap.
 - Process cluster controls in event queue thread.
 - Use counter not pointer for connection ID, avoid re-use.
 - Do all processing in event queue thread to avoid races
   (temporary pending performance measurements)

Removed:
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h
Modified:
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
    qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp
    qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h
    qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp
    qpid/trunk/qpid/cpp/src/tests/FrameDecoder.cpp
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Mon Mar  2 23:30:08 2009
@@ -53,10 +53,6 @@
   qpid/cluster/ConnectionMap.cpp		\
   qpid/cluster/Cpg.cpp				\
   qpid/cluster/Cpg.h				\
-  qpid/cluster/Decoder.cpp			\
-  qpid/cluster/Decoder.h			\
-  qpid/cluster/ConnectionDecoder.cpp		\
-  qpid/cluster/ConnectionDecoder.h		\
   qpid/cluster/Dispatchable.h			\
   qpid/cluster/UpdateClient.cpp			\
   qpid/cluster/UpdateClient.h			\

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Mar  2 23:30:08 2009
@@ -22,6 +22,7 @@
 #include "UpdateClient.h"
 #include "FailoverExchange.h"
 
+#include "qpid/assert.h"
 #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
 #include "qmf/org/apache/qpid/cluster/Package.h"
 #include "qpid/broker/Broker.h"
@@ -91,7 +92,7 @@
     cpg(*this),
     name(settings.name),
     myUrl(settings.url.empty() ? Url() : Url(settings.url)),
-    myId(cpg.self()),
+    self(cpg.self()),
     readMax(settings.readMax),
     writeEstimate(settings.writeEstimate),
     mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
@@ -104,8 +105,7 @@
                       boost::bind(&Cluster::leave, this),
                       "Error delivering frames",
                       poller),
-    decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1), connections),
-    expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())),
+    expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, self, broker.getTimer())),
     frameId(0),
     initialized(false),
     state(INIT),
@@ -213,7 +213,7 @@
     framing::Buffer buf(static_cast<char*>(msg), msg_len);
     Event e(Event::decodeCopy(from, buf));
     e.setSequence(sequence++);
-    if (from == myId)  // Record self-deliveries for flow control.
+    if (from == self)  // Record self-deliveries for flow control.
         mcast.selfDeliver(e);
     deliver(e);
 }
@@ -227,42 +227,33 @@
 // Handler for deliverEventQueue
 void Cluster::deliveredEvent(const Event& e) {
     QPID_LATENCY_RECORD("delivered event queue", e);
-    Buffer buf(const_cast<char*>(e.getData()), e.getSize());
-    if (e.getType() == CONTROL) {
-        AMQFrame frame;
-        while (frame.decode(buf)) {
-            // Check for deliver close here so we can erase the
-            // connection decoder safely in this thread.
-            if (frame.getMethod()->isA<ClusterConnectionDeliverCloseBody>())
-                    decoder.erase(e.getConnectionId());
-            deliverFrameQueue.push(EventFrame(e, frame));
-        }
+    Mutex::ScopedLock l(lock);
+    if (e.isCluster()) {        // Cluster control, process in this thread.
+        AMQFrame frame(e.getFrame());
+        ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l);
+        if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
+            throw Exception(QPID_MSG("Invalid cluster control"));
     }
-    else if (e.getType() == DATA)
-        decoder.decode(e, e.getData());
+    else if (state >= CATCHUP) { // Connection frame, push onto deliver queue.
+        if (e.getType() == CONTROL)
+            connectionFrame(EventFrame(e, e.getFrame()));
+        else
+            connections.decode(e, e.getData());
+    }
+    else                        // connection frame && state < CATCHUP. Drop.
+        QPID_LOG(trace, *this << " DROP: " << e);
 }
 
 // Handler for deliverFrameQueue
 void Cluster::deliveredFrame(const EventFrame& e) {
-    Mutex::ScopedLock l(lock);
-    const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
+    Mutex::ScopedLock l(lock);  // TODO aconway 2009-03-02: don't need this lock?
+    assert(!e.isCluster());     // Only connection frames on this queue.
     QPID_LOG(trace, *this << " DLVR: " << e);
-    QPID_LATENCY_RECORD("delivered frame queue", e.frame);
-    if (e.isCluster()) {        // Cluster control frame
-        ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
-        if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
-            throw Exception(QPID_MSG("Invalid cluster control"));
-    }
-    else {                      // Connection frame.
-        if (state <= UPDATEE) {
-            QPID_LOG(trace, *this << " DROP: " << e);
-            return;
-        }
-        boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
-        if (connection)         // Ignore frames to closed local connections.
-            connection->deliveredFrame(e);
-    }
-    QPID_LATENCY_RECORD("processed", e.frame);
+    if (e.type == DATA)    // Sequence number to identify data frames.
+        const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
+    boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
+    if (connection)         // Ignore frames to closed local connections.
+        connection->deliveredFrame(e);
 }
   
 struct AddrList {
@@ -310,7 +301,7 @@
     std::string addresses;
     for (cpg_address* p = current; p < current+nCurrent; ++p) 
         addresses.append(MemberId(*p).str());
-    deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId));
+    deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
 }
 
 void Cluster::setReady(Lock&) {
@@ -323,7 +314,7 @@
     bool memberChange = map.configChange(addresses);
     if (state == LEFT) return;
     
-    if (!map.isAlive(myId)) {  // Final config change.
+    if (!map.isAlive(self)) {  // Final config change.
         leave(l);
         return;
     }
@@ -332,16 +323,16 @@
         if (map.aliveCount() == 1) {
             setClusterId(true);
             setReady(l);
-            map = ClusterMap(myId, myUrl, true);
+            map = ClusterMap(self, myUrl, true);
             memberUpdate(l);
             QPID_LOG(notice, *this << " first in cluster");
         }
         else {                  // Joining established group.
             state = JOINER;
             QPID_LOG(info, *this << " joining cluster: " << map);
-            mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId);
+            mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
             elders = map.getAlive();
-            elders.erase(myId);
+            elders.erase(self);
             broker.getLinks().setPassive(true);
         }
     }
@@ -361,7 +352,7 @@
     if (state == READY && map.isJoiner(id)) {
         state = OFFER;
         QPID_LOG(info, *this << " send update-offer to " << id);
-        mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), myId);
+        mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), self);
     }
 }
 
@@ -388,17 +379,29 @@
 void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
     if (map.ready(id, Url(url))) 
         memberUpdate(l);
-    if (state == CATCHUP && id == myId) {
+    if (state == CATCHUP && id == self) {
         setReady(l);
         QPID_LOG(notice, *this << " caught up, active cluster member");
     }
 }
 
+void Cluster::stall(Lock&) {
+    // Stop processing the deliveredEventQueue in order to send or
+    // recieve an update.
+    deliverEventQueue.stop();
+}
+
+void Cluster::unstall(Lock&) {
+    // Stop processing the deliveredEventQueue in order to send or
+    // recieve an update.
+    deliverEventQueue.start();
+}
+
 void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
     if (state == LEFT) return;
     MemberId updatee(updateeInt);
     boost::optional<Url> url = map.updateOffer(updater, updatee);
-    if (updater == myId) {
+    if (updater == self) {
         assert(state == OFFER);
         if (url) {              // My offer was first.
             updateStart(updatee, *url, l);
@@ -409,29 +412,29 @@
             makeOffer(map.firstJoiner(), l); // Maybe make another offer.
         }
     }
-    else if (updatee == myId && url) {
+    else if (updatee == self && url) {
         assert(state == JOINER);
         setClusterId(uuid);
         state = UPDATEE;
         QPID_LOG(info, *this << " receiving update from " << updater);
-        deliverFrameQueue.stop();
+        stall(l);
         checkUpdateIn(l);
     }
 }
 
-void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
+void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
     if (state == LEFT) return;
     assert(state == OFFER);
     state = UPDATER;
     QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
-    deliverFrameQueue.stop();
+    stall(l);
     if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
     client::ConnectionSettings cs;
     cs.username = settings.username;
     cs.password = settings.password;
     cs.mechanism = settings.mechanism;
     updateThread = Thread(
-        new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(),
+        new UpdateClient(self, updatee, url, broker, map, frameId, connections.values(),
                          boost::bind(&Cluster::updateOutDone, this),
                          boost::bind(&Cluster::updateOutError, this, _1),
                          cs));
@@ -445,13 +448,13 @@
     checkUpdateIn(l);
 }
 
-void Cluster::checkUpdateIn(Lock& ) {
+void Cluster::checkUpdateIn(Lock& l) {
     if (state == UPDATEE && updatedMap) {
         map = *updatedMap;
-        mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
+        mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
         state = CATCHUP;
         QPID_LOG(info, *this << " received update, starting catch-up");
-        deliverFrameQueue.start();
+        unstall(l);
     }
 }
 
@@ -465,7 +468,7 @@
     assert(state == UPDATER);
     state = READY;
     mcast.release();
-    deliverFrameQueue.start();
+    unstall(l);
     makeOffer(map.firstJoiner(), l); // Try another offer
 }
 
@@ -490,7 +493,7 @@
         {
             _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args;
             stringstream stream;
-            stream << myId;
+            stream << self;
             if (iargs.i_brokerId == stream.str())
                 stopClusterNode(l);
         }
@@ -511,7 +514,7 @@
 
 void Cluster::stopFullCluster(Lock& ) {
     QPID_LOG(notice, *this << " shutting down cluster " << name);
-    mcast.mcastControl(ClusterShutdownBody(), myId);
+    mcast.mcastControl(ClusterShutdownBody(), self);
 }
 
 void Cluster::memberUpdate(Lock& l) {
@@ -522,12 +525,12 @@
     failoverExchange->setUrls(urls);
 
     if (size == 1 && lastSize > 1 && state >= CATCHUP) { 
-        QPID_LOG(info, *this << " last broker standing, update queue policies");
+        QPID_LOG(notice, *this << " last broker standing, update queue policies");
         lastBroker = true;
         broker.getQueues().updateQueueClusterState(true);
     }
     else if (size > 1 && lastBroker) {
-        QPID_LOG(info, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
+        QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
         lastBroker = false;
         broker.getQueues().updateQueueClusterState(false);
     }
@@ -549,17 +552,25 @@
         mgmtObject->set_memberIDs(idstr);
     }
 
-    // Close connections belonging to members that have now been excluded
-    connections.update(myId, map);
+    // Generate a deliver-close control frame for connections
+    // belonging to defunct members, so they will be erased in the
+    // deliverFrameQueue thread.
+    ConnectionMap::Vector c = connections.values();
+    for (ConnectionMap::Vector::iterator i = c.begin(); i != c.end(); ++i) {
+        ConnectionId cid = (*i)->getId();
+        MemberId mid = cid.getMember();
+        if (mid != self && !map.isMember(mid))
+            connectionFrame(EventFrame(EventHeader(CONTROL, cid), AMQFrame(ClusterConnectionDeliverCloseBody())));
+    }
 }
 
 std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
     static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
-    return o << cluster.myId << "(" << STATE[cluster.state] << ")";
+    return o << cluster.self << "(" << STATE[cluster.state] << ")";
 }
 
 MemberId Cluster::getId() const {
-    return myId;            // Immutable, no need to lock.
+    return self;            // Immutable, no need to lock.
 }
 
 broker::Broker& Cluster::getBroker() const {
@@ -578,7 +589,7 @@
     clusterId = uuid;
     if (mgmtObject) {
         stringstream stream;
-        stream << myId;
+        stream << self;
         mgmtObject->set_clusterID(clusterId.str());
         mgmtObject->set_memberID(stream.str());
     }
@@ -589,4 +600,11 @@
     expiryPolicy->deliverExpire(id);
 }
 
+void Cluster::connectionFrame(const EventFrame& frame) {
+    // FIXME aconway 2009-03-02: bypassing deliverFrameQueue to avoid race condition.
+    // Measure performance impact, restore with better locking.
+    // deliverFrameQueue.push(frame);
+    deliveredFrame(frame);
+}
+
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Mar  2 23:30:08 2009
@@ -30,7 +30,6 @@
 #include "NoOpConnectionOutputHandler.h"
 #include "PollerDispatch.h"
 #include "Quorum.h"
-#include "Decoder.h"
 #include "PollableQueue.h"
 #include "ExpiryPolicy.h"
 
@@ -102,7 +101,10 @@
     size_t getWriteEstimate() { return writeEstimate; }
 
     bool isLeader() const;       // Called in deliver thread.
-    
+
+    // Called by Connection in deliver event thread with decoded connection data frames.
+    void connectionFrame(const EventFrame&); 
+
   private:
     typedef sys::Monitor::ScopedLock Lock;
 
@@ -125,7 +127,7 @@
     void brokerShutdown();
 
     // Cluster controls implement XML methods from cluster.xml.
-    // Called in deliver thread.
+    // Called in deliveredEvent thread.
     // 
     void updateRequest(const MemberId&, const std::string&, Lock&);
     void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
@@ -134,6 +136,10 @@
     void messageExpired(const MemberId&, uint64_t, Lock& l);
     void shutdown(const MemberId&, Lock&);
 
+    // Used by cluster controls.
+    void stall(Lock&);
+    void unstall(Lock&);
+
     // Handlers for pollable queues.
     void deliveredEvent(const Event&); 
     void deliveredFrame(const EventFrame&); 
@@ -141,6 +147,10 @@
     // Helper, called in deliver thread.
     void updateStart(const MemberId& updatee, const Url& url, Lock&);
 
+    // Called in event deliver thread to check for update status.
+    bool isUpdateComplete(const EventFrame&);
+    bool isUpdateComplete();
+
     void setReady(Lock&);
 
     void deliver( // CPG deliver callback. 
@@ -186,7 +196,7 @@
     Cpg cpg;
     const std::string name;
     Url myUrl;
-    const MemberId myId;
+    const MemberId self;
     const size_t readMax;
     const size_t writeEstimate;
     framing::Uuid clusterId;
@@ -201,9 +211,6 @@
     boost::shared_ptr<FailoverExchange> failoverExchange;
     Quorum quorum;
 
-    // Used only in deliverdEvent thread
-    Decoder decoder;
-
     // Used only in deliveredFrame thread
     ClusterMap::Set elders;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Mon Mar  2 23:30:08 2009
@@ -40,6 +40,7 @@
 #include "qpid/framing/ConnectionCloseOkBody.h"
 #include "qpid/log/Statement.h"
 #include "qpid/sys/LatencyMetric.h"
+#include "qpid/sys/AtomicValue.h"
 
 #include <boost/current_function.hpp>
 
@@ -58,19 +59,22 @@
 
 NoOpConnectionOutputHandler Connection::discardHandler;
 
-// Shadow connections
-Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
-                       const std::string& wrappedId, ConnectionId myId)
-    : cluster(c), self(myId), catchUp(false), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false),
+namespace {
+sys::AtomicValue<uint64_t> idCounter;
+}
+
+// Shadow connection
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id)
+    : cluster(c), self(id), catchUp(false), output(*this, out),
+      connection(&output, cluster.getBroker(), logId), expectProtocolHeader(false),
       mcastFrameHandler(cluster.getMulticast(), self)
 { init(); }
 
-// Local connections
+// Local connection
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
-                       const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
-    : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0),
+                       const std::string& logId, MemberId member, bool isCatchUp, bool isLink)
+    : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
+      connection(&output, cluster.getBroker(), logId, isLink, catchUp ? ++catchUpId : 0),
       expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self)
 { init(); }
 
@@ -149,12 +153,9 @@
     if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
         && !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
     {
-        // FIXME aconway 2009-02-24: Using the DATA/CONTROL
-        // distinction to distinguish incoming vs. outgoing frames is
-        // very unclear.
         if (f.type == DATA) // incoming data frames to broker::Connection
             connection.received(const_cast<AMQFrame&>(f.frame)); 
-        else {                    // outgoing data frame, send via SessionState
+        else {                    // frame control, send frame via SessionState
             broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
             if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
         }
@@ -200,12 +201,12 @@
     connection.closed();
 }
 
-// Decode data from local clients.
+// ConnectoinCodec::decode receives read buffers from  directly-connected clients.
 size_t Connection::decode(const char* buffer, size_t size) {
     if (catchUp) {  // Handle catch-up locally.
         Buffer buf(const_cast<char*>(buffer), size);
         while (localDecoder.decode(buf))
-            received(localDecoder.frame);
+            received(localDecoder.getFrame());
     }
     else {                      // Multicast local connections.
         assert(isLocal());
@@ -233,6 +234,29 @@
     return size;
 }
 
+// Decode a data event, a read buffer that has been delivered by the cluster.
+void Connection::decode(const EventHeader& eh, const void* data) {
+    assert(eh.getType() == DATA); // Only handle connection data events.
+    const char* cp = static_cast<const char*>(data);
+    Buffer buf(const_cast<char*>(cp), eh.getSize());
+    if (clusterDecoder.decode(buf)) {  // Decoded a frame
+        AMQFrame frame(clusterDecoder.getFrame());
+        while (clusterDecoder.decode(buf)) {
+            cluster.connectionFrame(EventFrame(eh, frame));
+            frame = clusterDecoder.getFrame();
+        }
+        // Set read-credit on the last frame ending in this event.
+        // Credit will be given when this frame is processed.
+        cluster.connectionFrame(EventFrame(eh, frame, 1)); 
+    }
+    else {
+        // We must give 1 unit read credit per event.
+        // This event does not complete any frames so 
+        // we give read credit directly.
+        giveReadCredit(1);
+    }    
+}
+
 broker::SessionState& Connection::sessionState() {
     return *connection.getChannel(currentChannel).getSession();
 }
@@ -267,11 +291,12 @@
     QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
 }
     
-void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username) {
-    ConnectionId shadow = ConnectionId(memberId, connectionId);
-    QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow);
-    self = shadow;
+void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment) {
+    ConnectionId shadowId = ConnectionId(memberId, connectionId);
+    QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
+    self = shadowId;
     connection.setUserId(username);
+    clusterDecoder.setFragment(fragment.data(), fragment.size());
 }
 
 void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) {
@@ -281,7 +306,7 @@
 }
 
 bool Connection::isLocal() const {
-    return self.first == cluster.getId() && self.second == this;
+    return self.first == cluster.getId() && self.second;
 }
 
 bool Connection::isShadow() const {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Mon Mar  2 23:30:08 2009
@@ -64,10 +64,10 @@
   public:
     typedef sys::PollableQueue<EventFrame> PollableFrameQueue;
 
-    /** Local connection, use this in ConnectionId */
-    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp, bool isLink);
-    /** Shadow connection */
-    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId);
+    /** Local connection. */
+    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink);
+    /** Shadow connection. */
+    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id);
     ~Connection();
     
     ConnectionId getId() const { return self; }
@@ -100,9 +100,12 @@
     /** Called if the connectors member has left the cluster */
     void left();
     
-    // ConnectionCodec methods
+    // ConnectionCodec methods - called by IO layer with a read buffer.
     size_t decode(const char* buffer, size_t size);
 
+    // Decode a data event, a read buffer that has been delivered by the cluster.
+    void decode(const EventHeader& eh, const void* data);
+
     // Called for data delivered from the cluster.
     void deliveredFrame(const EventFrame&);
 
@@ -118,7 +121,7 @@
                       const framing::SequenceNumber& received,
                       const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
     
-    void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username);
+    void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
 
     void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId);
 
@@ -149,7 +152,9 @@
     void exchange(const std::string& encoded);
 
     void giveReadCredit(int credit);
-    
+
+    framing::FrameDecoder& getDecoder() { return clusterDecoder; }
+
   private:
     struct NullFrameHandler : public framing::FrameHandler {
         void handle(framing::AMQFrame&) {}
@@ -174,6 +179,7 @@
     WriteEstimate writeEstimate;
     OutputInterceptor output;
     framing::FrameDecoder localDecoder;
+    framing::FrameDecoder clusterDecoder;
     broker::Connection connection;
     framing::SequenceNumber deliverSeq;
     framing::ChannelId currentChannel;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Mon Mar  2 23:30:08 2009
@@ -46,16 +46,13 @@
 
 // Used for outgoing Link connections, we don't care.
 sys::ConnectionCodec*
-ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) {
-    return new ConnectionCodec(out, id, cluster, false, true);
-    //return next->create(out, id);
+ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId) {
+    return new ConnectionCodec(out, logId, cluster, false, true);
 }
 
-ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp, bool isLink)
-    : codec(out, id, isLink),
-      interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp, isLink)),
-      id(interceptor->getId()),
-      localId(id)
+ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& cluster, bool catchUp, bool isLink)
+    : codec(out, logId, isLink),
+      interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink))
 {
     std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
     codec.setInputHandler(ih);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Mon Mar  2 23:30:08 2009
@@ -56,7 +56,7 @@
         sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
     };
 
-    ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp, bool isLink);
+    ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& c, bool catchUp, bool isLink);
     ~ConnectionCodec();
 
     // ConnectionCodec functions.
@@ -71,8 +71,6 @@
   private:
     amqp_0_10::Connection codec;
     boost::intrusive_ptr<cluster::Connection> interceptor;
-    cluster::ConnectionId id;
-    std::string localId;
 };
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp Mon Mar  2 23:30:08 2009
@@ -38,9 +38,9 @@
 
 void ConnectionMap::erase(const ConnectionId& id) {
     Lock l(lock);
-    Map::iterator i = map.find(id);
-    QPID_ASSERT(i != map.end());
-    map.erase(i);
+    size_t erased = map.erase(id);
+    assert(erased);
+    (void)erased;               // Avoid unused variable warnings.
 }
 
 ConnectionMap::ConnectionPtr ConnectionMap::get(const ConnectionId& id) {
@@ -61,13 +61,6 @@
     return i->second;
 }
 
-ConnectionMap::ConnectionPtr ConnectionMap::getLocal(const ConnectionId& id) {
-    Lock l(lock);
-    if (id.getMember() != cluster.getId()) return 0;
-    Map::const_iterator i = map.find(id);
-    return i == map.end() ? 0 : i->second;
-}
-
 ConnectionMap::Vector ConnectionMap::values() const {
     Lock l(lock);
     Vector result(map.size());
@@ -76,22 +69,16 @@
     return result;
 }
 
-void ConnectionMap::update(MemberId myId, const ClusterMap& cluster) {
-    Lock l(lock);
-    for (Map::iterator i = map.begin(); i != map.end(); ) {
-        MemberId member = i->first.getMember();
-        if (member != myId && !cluster.isMember(member)) { 
-            i->second->left();
-            map.erase(i++);
-        } else {
-            i++;
-        }
-    }
-}
-
 void ConnectionMap::clear() {
     Lock l(lock);
     map.clear();
 }
 
+void ConnectionMap::decode(const EventHeader& eh, const void* data) {
+    ConnectionPtr connection = get(eh.getConnectionId());
+    if (connection)
+        connection->decode(eh, data);
+}
+
+
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h Mon Mar  2 23:30:08 2009
@@ -60,18 +60,13 @@
      */ 
     ConnectionPtr get(const ConnectionId& id);
 
-    /** If ID is a local connection and in the map return it, else return 0 */
-    ConnectionPtr getLocal(const ConnectionId& id);
-        
     /** Get connections for sending an update. */
     Vector values() const;
 
-    /** Remove connections who's members are no longer in the cluster. Deliver thread. */
-    void update(MemberId myId, const ClusterMap& cluster); 
+    /** Decode a connection data event. */
+    void decode(const EventHeader& eh, const void* data);
 
-    
     void clear();
-
     size_t size() const;
 
   private:

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Mon Mar  2 23:30:08 2009
@@ -23,6 +23,7 @@
 #include "Cpg.h"
 #include "qpid/framing/Buffer.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/assert.h"
 #include <ostream>
 #include <iterator>
 #include <algorithm>
@@ -31,6 +32,7 @@
 namespace cluster {
 
 using framing::Buffer;
+using framing::AMQFrame;
 
 const size_t EventHeader::HEADER_SIZE =
     sizeof(uint8_t) +  // type
@@ -57,7 +59,7 @@
     type = (EventType)buf.getOctet();
     if(type != DATA && type != CONTROL)
         throw Exception("Invalid multicast event type");
-    connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong()));
+    connectionId = ConnectionId(m, buf.getLongLong());
     size = buf.getLong();
 #ifdef QPID_LATENCY_METRIC
     latency_metric_timestamp = buf.getLongLong();
@@ -93,7 +95,7 @@
 
 void EventHeader::encode(Buffer& b) const {
     b.putOctet(type);
-    b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer()));
+    b.putLongLong(connectionId.getNumber());
     b.putLong(size);
 #ifdef QPID_LATENCY_METRIC
     b.putLongLong(latency_metric_timestamp);
@@ -111,6 +113,14 @@
     return Buffer(const_cast<char*>(getData()), getSize());
 }
 
+AMQFrame Event::getFrame() const {
+    assert(type == CONTROL);
+    Buffer buf(*this);
+    AMQFrame frame;
+    QPID_ASSERT(frame.decode(buf));
+    return frame;
+}
+
 static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
 
 std::ostream& operator << (std::ostream& o, EventType t) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Mon Mar  2 23:30:08 2009
@@ -24,6 +24,7 @@
 
 #include "types.h"
 #include "qpid/RefCountedBuffer.h"
+#include "qpid/framing/AMQFrame.h"
 #include "qpid/sys/LatencyMetric.h"
 #include <sys/uio.h>            // For iovec
 #include <iosfwd>
@@ -59,8 +60,8 @@
     uint64_t getSequence() const { return sequence; }
     void setSequence(uint64_t n) { sequence = n; }
 
-    bool isCluster() const { return connectionId.getPointer() == 0; }
-    bool isConnection() const { return connectionId.getPointer() != 0; }
+    bool isCluster() const { return connectionId.getNumber() == 0; }
+    bool isConnection() const { return connectionId.getNumber() != 0; }
 
   protected:
     static const size_t HEADER_SIZE;
@@ -97,6 +98,8 @@
     // Store including header
     char* getStore() { return store; }
     const char* getStore() const { return store; }
+
+    framing::AMQFrame getFrame() const;        
     
     operator framing::Buffer() const;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Mon Mar  2 23:30:08 2009
@@ -42,8 +42,8 @@
 
     EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc=0);
 
-    bool isCluster() const { return !connectionId.getPointer(); }
-    bool isConnection() const { return connectionId.getPointer(); }
+    bool isCluster() const { return connectionId.getNumber() == 0; }
+    bool isConnection() const { return connectionId.getNumber() != 0; }
     bool isLastInEvent() const { return readCredit; }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Mon Mar  2 23:30:08 2009
@@ -95,7 +95,7 @@
     : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
       frameId(frameId_), connections(cons), 
       connection(catchUpConnection()), shadowConnection(catchUpConnection()),
-      done(ok), failed(fail) 
+      done(ok), failed(fail), connectionSettings(cs)
 {
     connection.open(url, cs);
     session = connection.newSession("update_shared");
@@ -228,13 +228,15 @@
     shadowConnection = catchUpConnection();
 
     broker::Connection& bc = updateConnection->getBrokerConnection();
-    // FIXME aconway 2008-10-20: What authentication info to use on reconnect?
-    shadowConnection.open(updateeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
+    connectionSettings.maxFrameSize = bc.getFrameMax();
+    shadowConnection.open(updateeUrl, connectionSettings);
     bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
+    std::pair<const char*, size_t> fragment = updateConnection->getDecoder().getFragment();
     ClusterConnectionProxy(shadowConnection).shadowReady(
         updateConnection->getId().getMember(),
-        reinterpret_cast<uint64_t>(updateConnection->getId().getPointer()),
-        updateConnection->getBrokerConnection().getUserId()
+        updateConnection->getId().getNumber(),
+        bc.getUserId(),
+        string(fragment.first, fragment.second)
     );
     shadowConnection.close();
     QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
@@ -285,9 +287,6 @@
     if (inProgress) {
         inProgress->getFrames().map(simpl->out);
     }
-
-    // FIXME aconway 2008-09-23: update session replay list.
-
     QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Mon Mar  2 23:30:08 2009
@@ -98,6 +98,7 @@
     client::AsyncSession session, shadowSession;
     boost::function<void()> done;
     boost::function<void(const std::exception& e)> failed;
+    client::ConnectionSettings connectionSettings;
 };
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Mon Mar  2 23:30:08 2009
@@ -68,17 +68,16 @@
 
 std::ostream& operator<<(std::ostream&, const MemberId&);
 
-struct ConnectionId : public std::pair<MemberId, Connection*>  {
-    ConnectionId(const MemberId& m=MemberId(), Connection* c=0) :  std::pair<MemberId, Connection*> (m,c) {}
-    ConnectionId(uint64_t m, uint64_t c)
-        : std::pair<MemberId, Connection*>(MemberId(m), reinterpret_cast<Connection*>(c)) {}
+struct ConnectionId : public std::pair<MemberId, uint64_t>  {
+    ConnectionId(const MemberId& m=MemberId(), uint64_t c=0) :  std::pair<MemberId, uint64_t> (m,c) {}
+    ConnectionId(uint64_t m, uint64_t c) : std::pair<MemberId, uint64_t>(MemberId(m), c) {}
     MemberId getMember() const { return first; }
-    Connection* getPointer() const { return second; }
+    uint64_t getNumber() const { return second; }
 };
 
 std::ostream& operator<<(std::ostream&, const ConnectionId&);
 
-std::ostream& operator << (std::ostream&, EventType);
+std::ostream& operator<<(std::ostream&, EventType);
 
 }} // namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp Mon Mar  2 23:30:08 2009
@@ -21,8 +21,9 @@
 #include "FrameDecoder.h"
 #include "Buffer.h"
 #include "qpid/log/Statement.h"
-#include <algorithm>
 #include "qpid/framing/reply_exceptions.h"
+#include <algorithm>
+#include <string.h>
 
 namespace qpid {
 namespace framing {
@@ -67,4 +68,13 @@
     return false;
 }
 
+void FrameDecoder::setFragment(const char* data, size_t size) {
+    fragment.resize(size);
+    ::memcpy(fragment.data(), data, size);
+}
+
+std::pair<const char*, size_t> FrameDecoder::getFragment() const {
+    return std::pair<const char*, size_t>(fragment.data(), fragment.size());
+}
+
 }} // namespace qpid::framing

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h Mon Mar  2 23:30:08 2009
@@ -35,9 +35,16 @@
 {
   public:
     bool decode(Buffer& buffer);
-    AMQFrame frame;
+    const AMQFrame& getFrame() const { return frame; }
+    AMQFrame& getFrame() { return frame; }
+
+    void setFragment(const char*, size_t);
+    std::pair<const char*, size_t> getFragment() const;
+
   private:
     std::vector<char> fragment;
+    AMQFrame frame;
+
 };
 }} // namespace qpid::framing
 

Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp Mon Mar  2 23:30:08 2009
@@ -109,7 +109,7 @@
     Args args(makeArgs(prefix));
     vector<const char*> argv(args.size());
     transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1));
-    qpid::log::Logger::instance().setPrefix(os.str());
+    qpid::log::Logger::instance().setPrefix(prefix);
     localBroker.reset(new BrokerFixture(parseOpts(argv.size(), &argv[0])));
     push_back(localBroker->getPort());
     forkedBrokers.push_back(shared_ptr<ForkedBroker>());

Modified: qpid/trunk/qpid/cpp/src/tests/FrameDecoder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/FrameDecoder.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/FrameDecoder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/FrameDecoder.cpp Mon Mar  2 23:30:08 2009
@@ -65,7 +65,7 @@
     }
     Buffer buf(&encoded[encoded.size()-1], 1);
     BOOST_CHECK(decoder.decode(buf));
-    BOOST_CHECK_EQUAL(data, getData(decoder.frame));
+    BOOST_CHECK_EQUAL(data, getData(decoder.getFrame()));
 }
 
 

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Mon Mar  2 23:30:08 2009
@@ -125,6 +125,7 @@
       <field name="member-id" type="uint64"/>
       <field name="connection-id" type="uint64"/>
       <field name="user-name" type="str8"/>
+      <field name="fragment" type="str32"/>
     </control>
 
     <!-- Complete a cluster state update. -->



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Re: svn commit: r749473 - in /qpid/trunk/qpid/cpp: src/ src/qpid/cluster/ src/qpid/framing/ src/tests/ xml/

Posted by Aidan Skinner <ai...@gmail.com>.
On Tue, Mar 3, 2009 at 5:58 PM, Gordon Sim <gs...@redhat.com> wrote:
> Aidan Skinner wrote:
>>
>> Stil broken for me, I'm at 749627:
>
> Try 749669.

That fixed it, thanks! :)

- Aidan
-- 
Apache Qpid - World Domination through Advanced Message Queueing
http://qpid.apache.org

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: svn commit: r749473 - in /qpid/trunk/qpid/cpp: src/ src/qpid/cluster/ src/qpid/framing/ src/tests/ xml/

Posted by Gordon Sim <gs...@redhat.com>.
Aidan Skinner wrote:
> Stil broken for me, I'm at 749627:

Try 749669.


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: svn commit: r749473 - in /qpid/trunk/qpid/cpp: src/ src/qpid/cluster/ src/qpid/framing/ src/tests/ xml/

Posted by Aidan Skinner <ai...@gmail.com>.
On Tue, Mar 3, 2009 at 4:54 PM, Gordon Sim <gs...@redhat.com> wrote:

> Aidan Skinner wrote:
>>
>> On Mon, Mar 2, 2009 at 11:30 PM,  <ac...@apache.org> wrote:
>>
>>> Author: aconway
>>> Date: Mon Mar  2 23:30:08 2009
>>> New Revision: 749473
>>>
>>> URL: http://svn.apache.org/viewvc?rev=749473&view=rev
>>> Log:
>>>
>>> Replicate connection decoder fragments to new members.
>>
>> This seems to have broken the C++ build for me on RHEL4
>
> Should be fixed in 749621.

Stil broken for me, I'm at 749627:

qpid/framing/FrameDecoder.cpp: In member function `std::pair<const
char*, size_t> qpid::framing::FrameDecoder::getFragment() const':
qpid/framing/FrameDecoder.cpp:77: error: 'const class
std::vector<char, std::allocator<char> >' has no member named 'data'

- Aidan
-- 
Apache Qpid - World Domination through Advanced Message Queueing
http://qpid.apache.org

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: svn commit: r749473 - in /qpid/trunk/qpid/cpp: src/ src/qpid/cluster/ src/qpid/framing/ src/tests/ xml/

Posted by Gordon Sim <gs...@redhat.com>.
Aidan Skinner wrote:
> On Mon, Mar 2, 2009 at 11:30 PM,  <ac...@apache.org> wrote:
> 
>> Author: aconway
>> Date: Mon Mar  2 23:30:08 2009
>> New Revision: 749473
>>
>> URL: http://svn.apache.org/viewvc?rev=749473&view=rev
>> Log:
>>
>> Replicate connection decoder fragments to new members.
> 
> This seems to have broken the C++ build for me on RHEL4

Should be fixed in 749621.


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: svn commit: r749473 - in /qpid/trunk/qpid/cpp: src/ src/qpid/cluster/ src/qpid/framing/ src/tests/ xml/

Posted by Aidan Skinner <ai...@apache.org>.
On Mon, Mar 2, 2009 at 11:30 PM,  <ac...@apache.org> wrote:

> Author: aconway
> Date: Mon Mar  2 23:30:08 2009
> New Revision: 749473
>
> URL: http://svn.apache.org/viewvc?rev=749473&view=rev
> Log:
>
> Replicate connection decoder fragments to new members.

This seems to have broken the C++ build for me on RHEL4, I get:
 g++ -DHAVE_CONFIG_H -I. -I. -I. -Igen -I./gen -pedantic -Wall -Wextra
-Wno-shadow -Wpointer-arith -Wcast-qual -Wcast-align -Wno-long-long
-Winvalid-pch -Wno-system-headers -Woverloaded-virtual -g -O2 -MT
qpid/framing/FrameDecoder.lo -MD -MP -MF
qpid/framing/.deps/FrameDecoder.Tpo -c qpid/framing/FrameDecoder.cpp
-fPIC -DPIC -o qpid/framing/.libs/FrameDecoder.o
qpid/framing/FrameDecoder.cpp: In member function `void
qpid::framing::FrameDecoder::setFragment(const char*, size_t)':
qpid/framing/FrameDecoder.cpp:73: error: 'class std::vector<char,
std::allocator<char> >' has no member named 'data'
qpid/framing/FrameDecoder.cpp: In member function `std::pair<const
char*, size_t> qpid::framing::FrameDecoder::getFragment() const':
qpid/framing/FrameDecoder.cpp:77: error: 'const class
std::vector<char, std::allocator<char> >' has no member named 'data'

:(

- Aidan
-- 
Apache Qpid - World Domination through Advanced Message Queueing
http://qpid.apache.org

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org