You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/09/06 16:10:09 UTC
svn commit: r692654 - in /incubator/qpid/trunk/qpid/cpp/src/qpid:
RefCountedBuffer.cpp RefCountedBuffer.h cluster/Cluster.cpp
cluster/Cluster.h cluster/Event.cpp cluster/Event.h
Author: aconway
Date: Sat Sep 6 07:10:08 2008
New Revision: 692654
URL: http://svn.apache.org/viewvc?rev=692654&view=rev
Log:
RefCountedBuffer improvements, centralize cluster encoding/decoding in Event.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp?rev=692654&r1=692653&r2=692654&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.cpp Sat Sep 6 07:10:08 2008
@@ -34,12 +34,19 @@
return const_cast<char*>(reinterpret_cast<const char*>(this)+sizeof(RefCountedBuffer));
}
-RefCountedBuffer::intrusive_ptr RefCountedBuffer::create(size_t n) {
+RefCountedBuffer::pointer RefCountedBuffer::create(size_t n) {
char* store=::new char[n+sizeof(RefCountedBuffer)];
new(store) RefCountedBuffer;
- return reinterpret_cast<RefCountedBuffer*>(store);
+ return pointer(reinterpret_cast<RefCountedBuffer*>(store));
}
+RefCountedBuffer::pointer::pointer() {}
+RefCountedBuffer::pointer::pointer(RefCountedBuffer* x) : p(x) {}
+RefCountedBuffer::pointer::pointer(const pointer& x) : p(x.p) {}
+RefCountedBuffer::pointer::~pointer() {}
+RefCountedBuffer::pointer& RefCountedBuffer::pointer::operator=(const RefCountedBuffer::pointer& x) { p = x.p; return *this; }
+
+char* RefCountedBuffer::pointer::cp() const { return p ? p->get() : 0; }
} // namespace qpid
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h?rev=692654&r1=692653&r2=692654&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/RefCountedBuffer.h Sat Sep 6 07:10:08 2008
@@ -27,7 +27,7 @@
#include <boost/intrusive_ptr.hpp>
namespace qpid {
-
+// FIXME aconway 2008-09-06: easy to add alignment
/**
* Reference-counted byte buffer.
* No alignment guarantees.
@@ -39,11 +39,32 @@
char* addr() const;
public:
-
- typedef boost::intrusive_ptr<RefCountedBuffer> intrusive_ptr;
-
+ /** Smart char pointer to a reference counted buffer */
+ class pointer {
+ boost::intrusive_ptr<RefCountedBuffer> p;
+ char* cp() const;
+ pointer(RefCountedBuffer* x);
+ friend class RefCountedBuffer;
+
+ public:
+ pointer();
+ pointer(const pointer&);
+ ~pointer();
+ pointer& operator=(const pointer&);
+
+ char* get() { return cp(); }
+ operator char*() { return cp(); }
+ char& operator*() { return *cp(); }
+ char& operator[](size_t i) { return cp()[i]; }
+
+ const char* get() const { return cp(); }
+ operator const char*() const { return cp(); }
+ const char& operator*() const { return *cp(); }
+ const char& operator[](size_t i) const { return cp()[i]; }
+ };
+
/** Create a reference counted buffer of size n */
- static intrusive_ptr create(size_t n);
+ static pointer create(size_t n);
/** Get a pointer to the start of the buffer. */
char* get() { return addr(); }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=692654&r1=692653&r2=692654&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Sat Sep 6 07:10:08 2008
@@ -116,26 +116,22 @@
void Cluster::mcastFrame(const AMQFrame& frame, const ConnectionId& connection) {
QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
- // FIXME aconway 2008-09-02: restore queueing.
- Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking.
- static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management or FrameEncoder.
- Buffer buf(buffer, sizeof(buffer));
- buf.putOctet(CONTROL);
- encodePtr(buf, connection.getConnectionPtr());
+ Event e(CONTROL, connection, frame.size());
+ Buffer buf(e);
frame.encode(buf);
- iovec iov = { buffer, buf.getPosition() };
- cpg.mcast(name, &iov, 1);
+ mcastEvent(e);
+}
+
+void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection) {
+ QPID_LOG(trace, "MCAST [" << connection << "] " << size << "bytes of data");
+ Event e(DATA, connection, size);
+ memcpy(e.getData(), data, size);
+ mcastEvent(e);
}
-void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) {
- // FIXME aconway 2008-09-02: does this need locking?
- Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking.
- char hdrbuf[1+sizeof(uint64_t)];
- Buffer buf(hdrbuf, sizeof(hdrbuf));
- buf.putOctet(DATA);
- encodePtr(buf, id.getConnectionPtr());
- iovec iov[] = { { hdrbuf, buf.getPosition() }, { const_cast<char*>(data), size } };
- cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
+void Cluster::mcastEvent(const Event& e) {
+ QPID_LOG(trace, "Multicasting: " << e);
+ e.mcast(name, cpg);
}
size_t Cluster::size() const {
@@ -186,6 +182,7 @@
}
void Cluster::deliverEvent(const Event& e) {
+ QPID_LOG(trace, "Delivered: " << e);
Buffer buf(e);
if (e.getConnection().getConnectionPtr() == 0) { // Cluster control
AMQFrame frame;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=692654&r1=692653&r2=692654&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Sat Sep 6 07:10:08 2008
@@ -67,10 +67,11 @@
bool empty() const { return size() == 0; }
- /** Send frame to the cluster */
+ /** Send to the cluster */
void mcastFrame(const framing::AMQFrame&, const ConnectionId&);
void mcastBuffer(const char*, size_t, const ConnectionId&);
-
+ void mcastEvent(const Event& e);
+
/** Leave the cluster */
void leave();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=692654&r1=692653&r2=692654&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Sat Sep 6 07:10:08 2008
@@ -22,13 +22,16 @@
#include "Event.h"
#include "Cpg.h"
#include "qpid/framing/Buffer.h"
+#include <ostream>
+#include <iterator>
+#include <algorithm>
namespace qpid {
namespace cluster {
using framing::Buffer;
-const size_t Event::OVERHEAD = 1 /*type*/ + 8 /*64-bit pointr*/;
+const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t);
Event::Event(EventType t, const ConnectionId c, const size_t s)
: type(t), connection(c), size(s), data(RefCountedBuffer::create(s)) {}
@@ -43,15 +46,27 @@
return e;
}
-void Event::mcast(const Cpg::Name& name, Cpg& cpg) {
+void Event::mcast (const Cpg::Name& name, Cpg& cpg) const {
char header[OVERHEAD];
- Buffer b;
+ Buffer b(header, OVERHEAD);
b.putOctet(type);
b.putLongLong(reinterpret_cast<uint64_t>(connection.getConnectionPtr()));
- iovec iov[] = { { header, b.getPosition() }, { data.get(), size } };
+ iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize() } };
cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
}
+Event::operator Buffer() const {
+ return Buffer(const_cast<char*>(getData()), getSize());
+}
+static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
+std::ostream& operator << (std::ostream& o, const Event& e) {
+ o << "[event: " << e.getConnection()
+ << " " << EVENT_TYPE_NAMES[e.getType()]
+ << " " << e.getSize() << " bytes: ";
+ std::ostream_iterator<char> oi(o,"");
+ std::copy(e.getData(), e.getData()+std::min(e.getSize(), size_t(16)), oi);
+ return o << "...]";
+}
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=692654&r1=692653&r2=692654&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Sat Sep 6 07:10:08 2008
@@ -26,6 +26,7 @@
#include "Cpg.h"
#include "qpid/RefCountedBuffer.h"
#include "qpid/framing/Buffer.h"
+#include <iosfwd>
namespace qpid {
namespace cluster {
@@ -46,24 +47,25 @@
/** Create an event copied from delivered data. */
static Event delivered(const MemberId& m, void* data, size_t size);
- void mcast(const Cpg::Name& name, Cpg& cpg);
+ void mcast(const Cpg::Name& name, Cpg& cpg) const;
EventType getType() const { return type; }
ConnectionId getConnection() const { return connection; }
size_t getSize() const { return size; }
- char* getData() { return data->get(); }
- const char* getData() const { return data->get(); }
+ char* getData() { return data; }
+ const char* getData() const { return data; }
- operator framing::Buffer() const { return framing::Buffer(const_cast<char*>(getData()), getSize()); }
+ operator framing::Buffer() const;
private:
static const size_t OVERHEAD;
EventType type;
ConnectionId connection;
size_t size;
- RefCountedBuffer::intrusive_ptr data;
+ RefCountedBuffer::pointer data;
};
+std::ostream& operator << (std::ostream&, const Event&);
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_EVENT_H*/