You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/09/03 05:21:01 UTC

svn commit: r691489 - in /incubator/qpid/trunk/qpid/cpp: src/ src/qpid/amqp_0_10/ src/qpid/cluster/ src/qpid/framing/ src/tests/ xml/

Author: aconway
Date: Tue Sep  2 20:21:00 2008
New Revision: 691489

URL: http://svn.apache.org/viewvc?rev=691489&view=rev
Log:
Cluster multicasts buffers rather than frames.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
    incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp
    incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Sep  2 20:21:00 2008
@@ -209,6 +209,7 @@
   qpid/framing/FieldTable.cpp \
   qpid/framing/FieldValue.cpp \
   qpid/framing/FrameSet.cpp \
+  qpid/framing/FrameDecoder.cpp \
   qpid/framing/ProtocolInitiation.cpp \
   qpid/framing/ProtocolVersion.cpp \
   qpid/framing/SendContent.cpp \
@@ -493,7 +494,7 @@
   qpid/framing/FieldTable.h \
   qpid/framing/FieldValue.h \
   qpid/framing/FrameDefaultVisitor.h \
-  qpid/framing/FrameHandler.h \
+  qpid/framing/FrameDecoder.h \
   qpid/framing/FrameHandler.h \
   qpid/framing/FrameSet.h \
   qpid/framing/Handler.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Tue Sep  2 20:21:00 2008
@@ -59,7 +59,7 @@
 
 bool Connection::canEncode() {
     if (!frameQueueClosed) connection->doOutput();
-        Mutex::ScopedLock l(frameQueueLock);
+    Mutex::ScopedLock l(frameQueueLock);
     return (!isClient && !initialized) || !frameQueue.empty();
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Sep  2 20:21:00 2008
@@ -4,7 +4,7 @@
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
-n * You may obtain a copy of the License at
+ * You may obtain a copy of the License at
  *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -20,14 +20,17 @@
 #include "Connection.h"
 
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/SessionState.h"
+#include "qpid/broker/Connection.h"
 #include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ClusterJoinedBody.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/ClusterUrlNoticeBody.h"
+#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
+#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
 #include "qpid/log/Statement.h"
 #include "qpid/memory.h"
 #include "qpid/shared_ptr.h"
-#include "qpid/framing/AMQP_AllOperations.h"
-#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/Invoker.h"
 
 #include <boost/bind.hpp>
 #include <boost/cast.hpp>
@@ -38,36 +41,17 @@
 
 namespace qpid {
 namespace cluster {
-
 using namespace qpid::framing;
 using namespace qpid::sys;
 using namespace std;
 
-// Handle cluster controls from a given member.
-struct ClusterOperations :  public framing::AMQP_AllOperations::ClusterHandler {
+struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
     Cluster& cluster;
     MemberId member;
-    
-    ClusterOperations(Cluster& c, const MemberId& m) : cluster(c), member(m) {}
-
-    void joined(const std::string& url) {
-        cluster.joined(member, url);
-    }
+    ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {}
+    void urlNotice(const std::string& u) { cluster.urlNotice (member, u); }
+    bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
 };
-    
-ostream& operator <<(ostream& out, const Cluster& cluster) {
-    return out << cluster.name.str() << "-" << cluster.self;
-}
-
-ostream& operator<<(ostream& out, const Cluster::UrlMap::value_type& m) {
-    return out << m.first << " at " << m.second;
-}
-
-ostream& operator <<(ostream& out, const Cluster::UrlMap& urls) {
-    ostream_iterator<Cluster::UrlMap::value_type> o(out, " ");
-    copy(urls.begin(), urls.end(), o);
-    return out;
-}
 
 Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
     broker(&b),
@@ -80,30 +64,39 @@
                       boost::bind(&Cluster::dispatch, this, _1), // read
                       0,                                         // write
                       boost::bind(&Cluster::disconnect, this, _1) // disconnect
-    ),
-    deliverQueue(boost::bind(&Cluster::deliverQueueCb, this, _1, _2)),
-    mcastQueue(boost::bind(&Cluster::mcastQueueCb, this, _1, _2))
+    )
 {
     broker->addFinalizer(boost::bind(&Cluster::leave, this));
-    QPID_LOG(trace, "Node " << self << " joining cluster: " << name_);
+    QPID_LOG(trace, "Joining cluster: " << name << " as " << self);
     cpg.join(name);
-    send(AMQFrame(in_place<ClusterJoinedBody>(ProtocolVersion(), url.str())), ConnectionId(self,0));
+    mcastFrame(AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())),
+               ConnectionId(self,0));
 
     // Start dispatching from the poller.
     cpgDispatchHandle.startWatch(poller);
-    deliverQueue.start(poller);
-    mcastQueue.start(poller);
 }
 
 Cluster::~Cluster() {}
 
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
+    Mutex::ScopedLock l(lock);
+    connections.insert(ConnectionMap::value_type(ConnectionId(self, c.get()), c));
+}
+
+void Cluster::erase(ConnectionId id) {
+    Mutex::ScopedLock l(lock);
+    connections.erase(id);
+}
+
 void Cluster::leave() {
     Mutex::ScopedLock l(lock);
     if (!broker) return;                               // Already left.
     // Leave is called by from Broker destructor after the poller has
     // been shut down. No dispatches can occur.
+
+    QPID_LOG(debug, "Leaving cluster " << name.str());
     cpg.leave(name);
-    // broker is set to 0 when the final config-change is delivered.
+    // broker= is set to 0 when the final config-change is delivered.
     while(broker) {
         Mutex::ScopedUnlock u(lock);
         cpg.dispatchAll();
@@ -121,30 +114,30 @@
     buf.putLongLong(value);
 }
 
-void Cluster::send(const AMQFrame& frame, const ConnectionId& id) {
-    QPID_LOG(trace, "MCAST [" << id << "] " << frame);
-    mcastQueue.push(Message(frame, id));
-}
-
-void Cluster::mcastQueueCb(const MessageQueue::iterator& begin,
-                           const MessageQueue::iterator& end)
-{
-    // Static is OK because there is only one cluster allowed per
-    // process and only one thread in mcastQueueCb at a time.
-    static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
+void Cluster::mcastFrame(const AMQFrame& frame, const ConnectionId& connection) {
+    QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
+    // FIXME aconway 2008-09-02: restore queueing.
+    Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking.
+    static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management or FrameEncoder.
     Buffer buf(buffer, sizeof(buffer));
-    for (MessageQueue::iterator i = begin; i != end; ++i) {
-        AMQFrame& frame =i->first;
-        ConnectionId id =i->second;
-        if (buf.available() < frame.size() + sizeof(uint64_t))
-            break;
-        frame.encode(buf);
-        encodePtr(buf, id.second);
-    }
+    buf.putOctet(CONTROL);
+    encodePtr(buf, connection.getConnectionPtr());
+    frame.encode(buf);
     iovec iov = { buffer, buf.getPosition() };
     cpg.mcast(name, &iov, 1);
 }
 
+void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) {
+    // FIXME aconway 2008-09-02: does this need locking?
+    Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking.
+    char hdrbuf[1+sizeof(uint64_t)];
+    Buffer buf(hdrbuf, sizeof(hdrbuf));
+    buf.putOctet(DATA);
+    encodePtr(buf, id.getConnectionPtr());
+    iovec iov[] = { { hdrbuf, buf.getPosition() }, { const_cast<char*>(data), size } };
+    cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
+}
+
 size_t Cluster::size() const {
     Mutex::ScopedLock l(lock);
     return urls.size();
@@ -153,19 +146,23 @@
 std::vector<Url> Cluster::getUrls() const {
     Mutex::ScopedLock l(lock);
     std::vector<Url> result(urls.size());
-    std::transform(urls.begin(), urls.end(), result.begin(), boost::bind(&UrlMap::value_type::second, _1));
+    std::transform(urls.begin(), urls.end(), result.begin(),
+                   boost::bind(&UrlMap::value_type::second, _1));
     return result;        
 }
 
 boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) {
-    boost::intrusive_ptr<Connection> c = connections[id];
-    if (!c && id.first != self) { // Shadow connection
-        std::ostringstream os;
-        os << id;
-        c = connections[id] = new Connection(*this, shadowOut, os.str(), id);
+    if (id.getMember() == self)
+        return boost::intrusive_ptr<Connection>(id.getConnectionPtr());
+    ConnectionMap::iterator i = connections.find(id);
+    if (i == connections.end()) { // New shadow connection.
+        assert(id.getMember() != self);
+        std::ostringstream mgmtId;
+        mgmtId << name << ":"  << id;
+        ConnectionMap::value_type value(id, new Connection(*this, shadowOut, mgmtId.str(), id));
+        i = connections.insert(value).first;
     }
-    assert(c);
-    return c;
+    return i->second;
 }
 
 void Cluster::deliver(
@@ -176,17 +173,28 @@
     void* msg,
     int msg_len)
 {
-    MemberId from(nodeid, pid);
     try {
+        MemberId from(nodeid, pid);
         Buffer buf(static_cast<char*>(msg), msg_len);
-        while (buf.available() > 0) {
+        Connection* connection;
+        uint8_t type = buf.getOctet();
+        decodePtr(buf, connection);
+        if (connection == 0)  { // Cluster controls
             AMQFrame frame;
-            if (!frame.decode(buf))  // Not enough data.
-                throw Exception("Received incomplete cluster event.");
-            Connection* cp;
-            decodePtr(buf, cp);
-            QPID_LOG(critical, "deliverQ.push " << frame);
-            deliverQueue.push(Message(frame, ConnectionId(from, cp)));
+            while (frame.decode(buf)) 
+                if (!ClusterOperations(*this, from).invoke(frame))
+                    throw Exception("Invalid cluster control");
+        }
+        else {                  // Connection data or control
+            boost::intrusive_ptr<Connection> c =
+                getConnection(ConnectionId(from, connection));
+            if (type == DATA)
+                c->deliverBuffer(buf);
+            else {
+                AMQFrame frame;
+                while (frame.decode(buf))
+                    c->deliver(frame);
+            }
         }
     }
     catch (const std::exception& e) {
@@ -197,59 +205,24 @@
     }
 }
 
-void Cluster::deliverQueueCb(const MessageQueue::iterator& begin,
-                             const MessageQueue::iterator& end)
-{
-    for (MessageQueue::iterator i = begin; i != end; ++i) {
-        AMQFrame& frame(i->first);
-        ConnectionId connectionId(i->second);
-        try {
-            QPID_LOG(trace, "DLVR [" << connectionId << "]: " << frame);
-            if (!broker) {
-                QPID_LOG(error, "Unexpected DLVR after leaving the cluster.");
-                return;
-            }
-            if (connectionId.getConnectionPtr()) // Connection control
-                getConnection(connectionId)->deliver(frame);
-            else {              // Cluster control
-                ClusterOperations cops(*this, connectionId.getMember());
-                bool invoked = framing::invoke(cops, *frame.getBody()).wasHandled();
-                assert(invoked);
-            }
-        }
-        catch (const std::exception& e) {
-            // FIXME aconway 2008-01-30: exception handling.
-            QPID_LOG(critical, "Error in cluster deliverQueueCb: " << e.what());
-            assert(0);
-            throw;
-        }
-    }
-}
-
-void Cluster::joined(const MemberId& member, const string& url) {
-    Mutex::ScopedLock l(lock);
-    QPID_LOG(debug, member << " has URL " << url);
-    urls[member] = url;
-    lock.notifyAll();
-}
-
 void Cluster::configChange(
     cpg_handle_t /*handle*/,
     cpg_name */*group*/,
-    cpg_address */*current*/, int /*nCurrent*/,
+    cpg_address *current, int nCurrent,
     cpg_address *left, int nLeft,
-    cpg_address *joined, int nJoined)
+    cpg_address */*joined*/, int /*nJoined*/)
 {
-    QPID_LOG(debug, "Cluster change: " << std::make_pair(joined, nJoined) << std::make_pair(left, nLeft));
+    QPID_LOG(debug, "Cluster change: "
+             << std::make_pair(current, nCurrent)
+             << std::make_pair(left, nLeft));
+
     Mutex::ScopedLock l(lock);
-    // We add URLs to the map in joined() we don't keep track of pre-URL members yet.
-    for (int l = 0; l < nLeft; ++l) urls.erase(left[l]);
+    for (int i = 0; i < nLeft; ++i) urls.erase(left[i]);
+    // Add new members when their URL notice arraives.
 
-    if (std::find(left, left+nLeft, self) != left+nLeft) {
+    if (std::find(left, left+nLeft, self) != left+nLeft)
         broker = 0;       // We have left the group, this is the final config change.
-        QPID_LOG(debug, "Leaving cluster " << *this);
-    }
-    lock.notifyAll();     // Threads waiting for url changes.  
+    lock.notifyAll();     // Threads waiting for membership changes.
 }
 
 void Cluster::dispatch(sys::DispatchHandle& h) {
@@ -263,14 +236,8 @@
     broker->shutdown();
 }
 
-void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
-    Mutex::ScopedLock l(lock);
-    connections[c->getId()] = c;
-}
-
-void Cluster::erase(ConnectionId id) {
-    Mutex::ScopedLock l(lock);
-    connections.erase(id);
+void Cluster::urlNotice(const MemberId& m, const std::string& url) {
+    urls.insert(UrlMap::value_type(m,Url(url)));
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Sep  2 20:21:00 2008
@@ -68,17 +68,18 @@
     bool empty() const { return size() == 0; }
     
     /** Send frame to the cluster */
-    void send(const framing::AMQFrame&, const ConnectionId&);
+    void mcastFrame(const framing::AMQFrame&, const ConnectionId&);
+    void mcastBuffer(const char*, size_t, const ConnectionId&);
 
     /** Leave the cluster */
     void leave();
     
-    void joined(const MemberId&, const std::string& url);
+    void urlNotice(const MemberId&, const std::string& url);
 
     broker::Broker& getBroker() { assert(broker); return *broker; }
 
     MemberId getSelf() const { return self; }
-    
+
   private:
     typedef std::map<MemberId, Url>  UrlMap;
     typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
@@ -88,6 +89,11 @@
     typedef PollableQueue<Message> MessageQueue;
 
     boost::function<void()> shutdownNext;
+
+    /** Handle a delivered frame */
+    void deliverFrame(framing::AMQFrame&, const ConnectionId&);
+
+    void deliverBuffer(const char*, size_t, const ConnectionId&);
     
     /** CPG deliver callback. */
     void deliver(
@@ -107,15 +113,6 @@
         struct cpg_address */*joined*/, int /*nJoined*/
     );
 
-    /** Callback to handle delivered frames from the deliverQueue. */
-    void deliverQueueCb(const MessageQueue::iterator& begin,
-                      const MessageQueue::iterator& end);
-
-    /** Callback to multi-cast frames from mcastQueue */
-    void mcastQueueCb(const MessageQueue::iterator& begin,
-                    const MessageQueue::iterator& end);
-
-
     /** Callback to dispatch CPG events. */
     void dispatch(sys::DispatchHandle&);
     /** Callback if CPG fd is disconnected. */
@@ -136,8 +133,6 @@
     ConnectionMap connections;
     NoOpConnectionOutputHandler shadowOut;
     sys::DispatchHandle cpgDispatchHandle;
-    MessageQueue deliverQueue;
-    MessageQueue mcastQueue;
 
   friend std::ostream& operator <<(std::ostream&, const Cluster&);
   friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Sep  2 20:21:00 2008
@@ -46,16 +46,23 @@
 
 Connection::~Connection() {}
 
-// Forward all received frames to the cluster, continue handling on delivery.
-void Connection::received(framing::AMQFrame& f) {
-    cluster.send(f, self);
+void Connection::received(framing::AMQFrame& ) {
+    // FIXME aconway 2008-09-02: not called, codec sends straight to deliver
+    assert(0);
 }
 
-// Don't doOutput in the 
-bool Connection::doOutput() { return output.doOutput(); } 
+bool Connection::doOutput() { return output.doOutput(); }
+
+// Delivery of doOutput allows us to run the real connection doOutput()
+// which stocks up the write buffers with data.
+//
+void Connection::deliverDoOutput(uint32_t requested) {
+    output.deliverDoOutput(requested);
+}
 
 // Handle frames delivered from cluster.
 void Connection::deliver(framing::AMQFrame& f) {
+    QPID_LOG(trace, "DLVR [" << self << "]: " << f);
     // Handle connection controls, deliver other frames to connection.
     if (!framing::invoke(*this, *f.getBody()).wasHandled())
         connection.received(f);
@@ -71,7 +78,8 @@
         // handler will be deleted.
         // 
         connection.setOutputHandler(&discardHandler); 
-        cluster.send(AMQFrame(in_place<ClusterConnectionDeliverCloseBody>()), self);
+        cluster.mcastFrame(AMQFrame(in_place<ClusterConnectionDeliverCloseBody>()), self);
+        ++mcastSeq;
     }
     catch (const std::exception& e) {
         QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
@@ -83,11 +91,19 @@
     cluster.erase(self);
 }
 
-// Delivery of doOutput allows us to run the real connection doOutput()
-// which stocks up the write buffers with data.
-// 
-void Connection::deliverDoOutput(uint32_t requested) {
-    output.deliverDoOutput(requested);
+size_t Connection::decode(const char* buffer, size_t size) { 
+    QPID_LOG(trace, "mcastBuffer " << self << " " << mcastSeq << " " << size);
+    ++mcastSeq;
+    cluster.mcastBuffer(buffer, size, self);
+    // FIXME aconway 2008-09-01: deserialize?
+    return size;
+}
+
+void Connection::deliverBuffer(Buffer& buf) {
+    QPID_LOG(trace, "deliverBuffer " << self << " " << deliverSeq << " " << buf.available());
+    ++deliverSeq;
+    while (decoder.decode(buf))
+        deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread.
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Sep  2 20:21:00 2008
@@ -31,6 +31,8 @@
 #include "qpid/amqp_0_10/Connection.h"
 #include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/framing/FrameDecoder.h"
+#include "qpid/framing/SequenceNumber.h"
 
 namespace qpid {
 
@@ -56,16 +58,16 @@
     ~Connection();
     
     ConnectionId getId() const { return self; }
+    broker::Connection& getBrokerConnection() { return connection; }
     bool isLocal() const { return self.second == this; }
 
-    // self-delivery of intercepted extension points.
+    Cluster& getCluster() { return cluster; }
+
+    // self-delivery of multicast data.
     void deliver(framing::AMQFrame& f);
     void deliverClose();
     void deliverDoOutput(uint32_t requested);
-
-    void codecDeleted();
-    
-    Cluster& getCluster() { return cluster; }
+    void deliverBuffer(framing::Buffer&);
 
     // ConnectionOutputHandler methods
     void close() {}
@@ -78,13 +80,15 @@
     void closed();
     bool doOutput();
     bool hasOutput() { return connection.hasOutput(); }
-    void idleOut() { idleOut(); }
-    void idleIn() { idleIn(); }
+    void idleOut() { connection.idleOut(); }
+    void idleIn() { connection.idleIn(); }
+
+    // ConnectionCodec methods
+    size_t decode(const char* buffer, size_t size);
 
     // ConnectionInputHandlerFactory
     sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient);
 
-    broker::Connection& getBrokerConnection() { return connection; }
   private:
     void sendDoOutput();
 
@@ -93,7 +97,10 @@
     NoOpConnectionOutputHandler discardHandler;
     WriteEstimate writeEstimate;
     OutputInterceptor output;
+    framing::FrameDecoder decoder;
     broker::Connection connection;
+    framing::SequenceNumber mcastSeq;
+    framing::SequenceNumber deliverSeq;
 };
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Tue Sep  2 20:21:00 2008
@@ -22,6 +22,7 @@
 #include "Connection.h"
 #include "ProxyInputHandler.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/log/Statement.h"
 #include "qpid/memory.h"
 
 namespace qpid {
@@ -54,7 +55,11 @@
 ConnectionCodec::~ConnectionCodec() {}
 
 // ConnectionCodec functions delegate to the codecOutput
-size_t ConnectionCodec::decode(const char* buffer, size_t size) { return codec.decode(buffer, size); }
+size_t ConnectionCodec::decode(const char* buffer, size_t size) {
+    return interceptor->decode(buffer, size);
+}
+
+// FIXME aconway 2008-09-02: delegate to interceptor?
 size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); }
 bool ConnectionCodec::canEncode() { return codec.canEncode(); }
 void ConnectionCodec::closed() { codec.closed(); }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Tue Sep  2 20:21:00 2008
@@ -58,7 +58,7 @@
     ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c);
     ~ConnectionCodec();
 
-    // ConnectionCodec functions delegate to the codecOutput
+    // ConnectionCodec functions.
     size_t decode(const char* buffer, size_t size);
     size_t encode(const char* buffer, size_t size);
     bool canEncode();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Tue Sep  2 20:21:00 2008
@@ -183,10 +183,6 @@
     return o << c.first << "-" << c.second;
 }
 
-ostream& operator<<(ostream& o, const cpg_name& name) {
-    return o << string(name.value, name.length);
-}
-
 }} // namespace qpid::cluster
 
 
@@ -195,16 +191,18 @@
 std::ostream& operator<<(std::ostream& o, const ::cpg_address& a) {
     const char* reasonString;
     switch (a.reason) {
-      case CPG_REASON_JOIN: reasonString = "joined"; break;
-      case CPG_REASON_LEAVE: reasonString = "left";break;
-      case CPG_REASON_NODEDOWN: reasonString = "node-down";break;
-      case CPG_REASON_NODEUP: reasonString = "node-up";break;
-      case CPG_REASON_PROCDOWN: reasonString = "process-down";break;
-      default:
-        assert(0);
-        reasonString = "";
+      case CPG_REASON_JOIN: reasonString = " joined"; break;
+      case CPG_REASON_LEAVE: reasonString = " left";break;
+      case CPG_REASON_NODEDOWN: reasonString = " node-down";break;
+      case CPG_REASON_NODEUP: reasonString = " node-up";break;
+      case CPG_REASON_PROCDOWN: reasonString = " process-down";break;
+      default: reasonString = "";
     }
-    return o << qpid::cluster::MemberId(a.nodeid, a.pid) << " " << reasonString;
+    return o << qpid::cluster::MemberId(a.nodeid, a.pid) << reasonString;
+}
+
+std::ostream& operator<<(std::ostream& o, const cpg_name& name) {
+    return o << std::string(name.value, name.length);
 }
 
 namespace std {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Tue Sep  2 20:21:00 2008
@@ -97,7 +97,7 @@
     // Send it anyway to keep the doOutput chain going until we are sure there's no more output
     // (in deliverDoOutput)
     // 
-    parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDeliverDoOutputBody>(
+    parent.getCluster().mcastFrame(AMQFrame(in_place<ClusterConnectionDeliverDoOutputBody>(
                                           framing::ProtocolVersion(), request)), parent.getId());
     QPID_LOG(trace, &parent << "Send doOutput request for " << request);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Tue Sep  2 20:21:00 2008
@@ -34,6 +34,9 @@
 
 class Connection;
 
+/** Types of cluster messages. */
+enum EventType { DATA, CONTROL };
+
 /** first=node-id, second=pid */
 struct MemberId : std::pair<uint32_t, uint32_t> {
     MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {}
@@ -51,6 +54,7 @@
     MemberId getMember() const { return first; }
     Connection* getConnectionPtr() const { return second; }
 };
+
 std::ostream& operator<<(std::ostream&, const ConnectionId&);
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp Tue Sep  2 20:21:00 2008
@@ -45,6 +45,13 @@
     return 12 /*frame header*/;
 }
 
+uint16_t AMQFrame::DECODE_SIZE_MIN=4;
+
+uint16_t AMQFrame::decodeSize(char* data) {
+    Buffer buf(data+2, DECODE_SIZE_MIN);
+    return buf.getShort();
+}
+
 void AMQFrame::encode(Buffer& buffer) const
 {
     //set track first (controls on track 0, everything else on 1):

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Tue Sep  2 20:21:00 2008
@@ -104,7 +104,10 @@
     bool getEos() const { return eos; }
     void setEos(bool isEos) { eos = isEos; }
 
+    static uint16_t DECODE_SIZE_MIN;
     static uint32_t frameOverhead();
+    /** Must point to at least DECODE_SIZE_MIN bytes of data */
+    static uint16_t decodeSize(char* data);
   private:
     void init() { bof = eof = bos = eos = true; subchannel=0; channel=0; }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h Tue Sep  2 20:21:00 2008
@@ -74,6 +74,7 @@
     uint32_t getSize() { return size; }
     uint32_t getPosition() { return position; }
     Iterator getIterator() { return Iterator(*this); }
+    char* getPointer() { return data; }
         
     void putOctet(uint8_t i);
     void putShort(uint16_t i);

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp?rev=691489&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp Tue Sep  2 20:21:00 2008
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "FrameDecoder.h"
+#include "Buffer.h"
+#include "qpid/log/Statement.h"
+#include <algorithm>
+
+namespace qpid {
+namespace framing {
+
+namespace {
+/** Move up to n bytes from start of buf to end of bytes. */
+void move(std::vector<char>& bytes, Buffer& buffer, size_t n) {
+    size_t oldSize = bytes.size();
+    n = std::min(n, size_t(buffer.available()));
+    bytes.resize(oldSize+n);
+    char* p = &bytes[oldSize];
+    buffer.getRawData(reinterpret_cast<uint8_t*>(p), n);
+}
+}
+
+bool FrameDecoder::decode(Buffer& buffer) {
+    if (buffer.available() == 0) return false;
+    if (fragment.empty()) {     
+        if (frame.decode(buffer)) // Decode from buffer
+            return true;
+        else                    // Store fragment
+            move(fragment, buffer, buffer.available());
+    }
+    else {                      // Already have a fragment
+        // Get enough data to decode the frame size.
+        if (fragment.size() < AMQFrame::DECODE_SIZE_MIN) {
+            move(fragment, buffer, AMQFrame::DECODE_SIZE_MIN - fragment.size());
+        }
+        if (fragment.size() >= AMQFrame::DECODE_SIZE_MIN) {
+            uint16_t size = AMQFrame::decodeSize(&fragment[0]);
+            assert(size > fragment.size());
+            move(fragment, buffer, size-fragment.size());
+            Buffer b(&fragment[0], fragment.size());
+            if (frame.decode(b)) {
+                assert(b.available() == 0);
+                fragment.clear();
+                return true;
+            }
+        }
+    }
+    return false;
+}
+
+}} // namespace qpid::framing

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h?rev=691489&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h Tue Sep  2 20:21:00 2008
@@ -0,0 +1,44 @@
+#ifndef QPID_FRAMING_FRAMEDECODER_H
+#define QPID_FRAMING_FRAMEDECODER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AMQFrame.h"
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Decode a frame from buffer.  If buffer does not contain a complete
+ * frame, caches the fragment for the next call to decode.
+ */
+class FrameDecoder
+{
+  public:
+    bool decode(Buffer& buffer);
+    AMQFrame frame;
+  private:
+    std::vector<char> fragment;
+};
+}} // namespace qpid::framing
+
+#endif  /*!QPID_FRAMING_FRAMEDECODER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp Tue Sep  2 20:21:00 2008
@@ -193,9 +193,10 @@
 }
 
 {
-  CPG related errors - seem benign but should invesgitate.
+  CPG error - seems benign.
    Memcheck:Param
    socketcall.sendmsg(msg.msg_iov[i])
-   fun:sendmsg
-   obj:/usr/lib/openais/libcpg.so.2.0.0
+   obj:*
+   obj:*/libcpg.so.2.0.0
 }
+

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Tue Sep  2 20:21:00 2008
@@ -85,7 +85,10 @@
             ::close(pipeFds[1]);
             FILE* f = ::fdopen(pipeFds[0], "r");
             if (!f) throw ErrnoException("fopen failed");
-            if (::fscanf(f, "%d", &port) != 1) throw ErrnoException("ill-formatted port");
+            if (::fscanf(f, "%d", &port) != 1) {
+                if (ferror(f)) throw ErrnoException("Error reading port number from child.");
+                else throw qpid::Exception("EOF reading port number from child.");
+            }
         }
         else {                  // child
             ::close(pipeFds[0]);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Sep  2 20:21:00 2008
@@ -27,6 +27,7 @@
 #include "qpid/client/Connection.h"
 #include "qpid/client/Session.h"
 #include "qpid/framing/Uuid.h"
+#include "qpid/log/Logger.h"
 
 #include <boost/bind.hpp>
 #include <boost/ptr_container/ptr_vector.hpp>
@@ -87,7 +88,7 @@
 
 void ClusterFixture::add() {
     std::ostringstream os;
-    os << "broker" << size();
+    os << "fork" << size();
     std::string prefix = os.str();
 
     const char* argv[] = {
@@ -105,6 +106,7 @@
     }
     else {                      // First broker, run in this process.
         Broker::Options opts;
+        qpid::log::Logger::instance().setPrefix("main");
         Plugin::addOptions(opts); // Pick up cluster options.
         opts.parse(argc, argv, "", true); // Allow-unknown for --load-module
         broker0.reset(new BrokerFixture(opts));
@@ -144,7 +146,8 @@
     ClusterFixture cluster(1);
     Client c(cluster[0]);
     BOOST_CHECK(c.session.queueQuery("q").getQueue().empty());
-    BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty()); 
+    BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty());
+    // FIXME aconway 2008-09-01: leaks if aisexec not running, investigate.
 }
 
 QPID_AUTO_TEST_CASE(testWiringReplication) {

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Tue Sep  2 20:21:00 2008
@@ -1,4 +1,4 @@
-<?xml version="1.0"?>
+o<?xml version="1.0"?>
 <!--
 -
 - Licensed to the Apache Software Foundation (ASF) under one
@@ -25,8 +25,8 @@
   <class name = "cluster" code = "0x80" label="Qpid clustering extensions.">
     <doc>Qpid extension class to allow clustered brokers to communicate.</doc>
 
-    <control name = "joined" code="0x1">
-      <field name="url" type="str16" />
+    <control name = "url-notice" code="0x1">
+      <field name="url-notice" type="str16" />
     </control>
   </class>