You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/09/06 16:10:09 UTC

svn commit: r692654 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: RefCountedBuffer.cpp RefCountedBuffer.h cluster/Cluster.cpp cluster/Cluster.h cluster/Event.cpp cluster/Event.h

Author: aconway
Date: Sat Sep  6 07:10:08 2008
New Revision: 692654

URL: http://svn.apache.org/viewvc?rev=692654&view=rev
Log:
RefCountedBuffer improvements, centralize cluster encoding/decoding in Event.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp?rev=692654&r1=692653&r2=692654&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp Sat Sep  6 07:10:08 2008
@@ -34,12 +34,19 @@
     return const_cast<char*>(reinterpret_cast<const char*>(this)+sizeof(RefCountedBuffer));
 }
 
-RefCountedBuffer::intrusive_ptr RefCountedBuffer::create(size_t n) {
+RefCountedBuffer::pointer RefCountedBuffer::create(size_t n) {
     char* store=::new char[n+sizeof(RefCountedBuffer)];
     new(store) RefCountedBuffer;
-    return reinterpret_cast<RefCountedBuffer*>(store);
+    return pointer(reinterpret_cast<RefCountedBuffer*>(store));
 }
 
+RefCountedBuffer::pointer::pointer() {}
+RefCountedBuffer::pointer::pointer(RefCountedBuffer* x) : p(x) {}
+RefCountedBuffer::pointer::pointer(const pointer& x) : p(x.p) {}
+RefCountedBuffer::pointer::~pointer() {}
+RefCountedBuffer::pointer& RefCountedBuffer::pointer::operator=(const RefCountedBuffer::pointer& x) { p = x.p; return *this; }
+
+char* RefCountedBuffer::pointer::cp() const { return p ? p->get() : 0; }
 } // namespace qpid
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h?rev=692654&r1=692653&r2=692654&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h Sat Sep  6 07:10:08 2008
@@ -27,7 +27,7 @@
 #include <boost/intrusive_ptr.hpp>
 
 namespace qpid {
-
+// FIXME aconway 2008-09-06: easy to add alignment
 /**
  * Reference-counted byte buffer.
  * No alignment guarantees.
@@ -39,11 +39,32 @@
     char* addr() const;
 
 public:
-
-    typedef boost::intrusive_ptr<RefCountedBuffer> intrusive_ptr;
-
+    /** Smart char pointer to a reference counted buffer */
+    class pointer {
+        boost::intrusive_ptr<RefCountedBuffer> p;
+        char* cp() const;
+        pointer(RefCountedBuffer* x);
+      friend class RefCountedBuffer;
+
+      public:
+        pointer();
+        pointer(const pointer&);
+        ~pointer();
+        pointer& operator=(const pointer&);
+        
+        char* get() { return cp(); }
+        operator char*() { return cp(); }
+        char& operator*() { return *cp(); }
+        char& operator[](size_t i) { return cp()[i]; }
+
+        const char* get() const { return cp(); }
+        operator const char*() const { return cp(); }
+        const char& operator*() const { return *cp(); }
+        const char& operator[](size_t i) const { return cp()[i]; }
+    };
+    
     /** Create a reference counted buffer of size n */
-    static intrusive_ptr create(size_t n);
+    static pointer create(size_t n);
 
     /** Get a pointer to the start of the buffer. */
     char* get() { return addr(); }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=692654&r1=692653&r2=692654&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Sat Sep  6 07:10:08 2008
@@ -116,26 +116,22 @@
 
 void Cluster::mcastFrame(const AMQFrame& frame, const ConnectionId& connection) {
     QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
-    // FIXME aconway 2008-09-02: restore queueing.
-    Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking.
-    static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management or FrameEncoder.
-    Buffer buf(buffer, sizeof(buffer));
-    buf.putOctet(CONTROL);
-    encodePtr(buf, connection.getConnectionPtr());
+    Event e(CONTROL, connection, frame.size());
+    Buffer buf(e);
     frame.encode(buf);
-    iovec iov = { buffer, buf.getPosition() };
-    cpg.mcast(name, &iov, 1);
+    mcastEvent(e);
+}
+
+void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection) {
+    QPID_LOG(trace, "MCAST [" << connection << "] " << size << "bytes of data");
+    Event e(DATA, connection, size);
+    memcpy(e.getData(), data, size);
+    mcastEvent(e);
 }
 
-void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) {
-    // FIXME aconway 2008-09-02: does this need locking?
-    Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking.
-    char hdrbuf[1+sizeof(uint64_t)];
-    Buffer buf(hdrbuf, sizeof(hdrbuf));
-    buf.putOctet(DATA);
-    encodePtr(buf, id.getConnectionPtr());
-    iovec iov[] = { { hdrbuf, buf.getPosition() }, { const_cast<char*>(data), size } };
-    cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
+void Cluster::mcastEvent(const Event& e) {
+    QPID_LOG(trace, "Multicasting: " << e);
+    e.mcast(name, cpg);
 }
 
 size_t Cluster::size() const {
@@ -186,6 +182,7 @@
 }
 
 void Cluster::deliverEvent(const Event& e) {
+    QPID_LOG(trace, "Delivered: " << e);
     Buffer buf(e);
     if (e.getConnection().getConnectionPtr() == 0)  { // Cluster control
         AMQFrame frame;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=692654&r1=692653&r2=692654&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Sat Sep  6 07:10:08 2008
@@ -67,10 +67,11 @@
 
     bool empty() const { return size() == 0; }
     
-    /** Send frame to the cluster */
+    /** Send to the cluster */
     void mcastFrame(const framing::AMQFrame&, const ConnectionId&);
     void mcastBuffer(const char*, size_t, const ConnectionId&);
-
+    void mcastEvent(const Event& e);
+    
     /** Leave the cluster */
     void leave();
     

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=692654&r1=692653&r2=692654&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Sat Sep  6 07:10:08 2008
@@ -22,13 +22,16 @@
 #include "Event.h"
 #include "Cpg.h"
 #include "qpid/framing/Buffer.h"
+#include <ostream>
+#include <iterator>
+#include <algorithm>
 
 namespace qpid {
 namespace cluster {
 
 using framing::Buffer;
 
-const size_t Event::OVERHEAD = 1 /*type*/ + 8 /*64-bit pointr*/;
+const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t);
 
 Event::Event(EventType t, const ConnectionId c, const size_t s)
     : type(t), connection(c), size(s), data(RefCountedBuffer::create(s)) {}
@@ -43,15 +46,27 @@
     return e;
 }
     
-void Event::mcast(const Cpg::Name& name, Cpg& cpg) {
+void Event::mcast (const Cpg::Name& name, Cpg& cpg) const {
     char header[OVERHEAD];
-    Buffer b;
+    Buffer b(header, OVERHEAD);
     b.putOctet(type);
     b.putLongLong(reinterpret_cast<uint64_t>(connection.getConnectionPtr()));
-    iovec iov[] = { { header, b.getPosition() }, { data.get(), size } };
+    iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize() } };
     cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
 }
 
+Event::operator Buffer() const  {
+    return Buffer(const_cast<char*>(getData()), getSize());
+}
 
+static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
+std::ostream& operator << (std::ostream& o, const Event& e) {
+    o << "[event: " << e.getConnection()
+      << " " << EVENT_TYPE_NAMES[e.getType()]
+      << " " << e.getSize() << " bytes: ";
+    std::ostream_iterator<char> oi(o,"");
+    std::copy(e.getData(), e.getData()+std::min(e.getSize(), size_t(16)), oi);
+    return o << "...]";
+}
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=692654&r1=692653&r2=692654&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Sat Sep  6 07:10:08 2008
@@ -26,6 +26,7 @@
 #include "Cpg.h"
 #include "qpid/RefCountedBuffer.h"
 #include "qpid/framing/Buffer.h"
+#include <iosfwd>
 
 namespace qpid {
 namespace cluster {
@@ -46,24 +47,25 @@
     /** Create an event copied from delivered data. */
     static Event delivered(const MemberId& m, void* data, size_t size);
     
-    void mcast(const Cpg::Name& name, Cpg& cpg);
+    void mcast(const Cpg::Name& name, Cpg& cpg) const;
     
     EventType getType() const { return type; }
     ConnectionId getConnection() const { return connection; }
     size_t getSize() const { return size; }
-    char* getData() { return data->get(); }
-    const char* getData() const { return data->get(); }
+    char* getData() { return data; }
+    const char* getData() const { return data; }
 
-    operator framing::Buffer() const { return framing::Buffer(const_cast<char*>(getData()), getSize()); }
+    operator framing::Buffer() const;
 
   private:
     static const size_t OVERHEAD;
     EventType type;
     ConnectionId connection;
     size_t size;
-    RefCountedBuffer::intrusive_ptr data;
+    RefCountedBuffer::pointer data;
 };
 
+std::ostream& operator << (std::ostream&, const Event&);
 }} // namespace qpid::cluster
 
 #endif  /*!QPID_CLUSTER_EVENT_H*/