You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/09/03 05:21:01 UTC
svn commit: r691489 - in /incubator/qpid/trunk/qpid/cpp: src/
src/qpid/amqp_0_10/ src/qpid/cluster/ src/qpid/framing/ src/tests/ xml/
Author: aconway
Date: Tue Sep 2 20:21:00 2008
New Revision: 691489
URL: http://svn.apache.org/viewvc?rev=691489&view=rev
Log:
Cluster multicasts buffers rather than frames.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp
incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Sep 2 20:21:00 2008
@@ -209,6 +209,7 @@
qpid/framing/FieldTable.cpp \
qpid/framing/FieldValue.cpp \
qpid/framing/FrameSet.cpp \
+ qpid/framing/FrameDecoder.cpp \
qpid/framing/ProtocolInitiation.cpp \
qpid/framing/ProtocolVersion.cpp \
qpid/framing/SendContent.cpp \
@@ -493,7 +494,7 @@
qpid/framing/FieldTable.h \
qpid/framing/FieldValue.h \
qpid/framing/FrameDefaultVisitor.h \
- qpid/framing/FrameHandler.h \
+ qpid/framing/FrameDecoder.h \
qpid/framing/FrameHandler.h \
qpid/framing/FrameSet.h \
qpid/framing/Handler.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Tue Sep 2 20:21:00 2008
@@ -59,7 +59,7 @@
bool Connection::canEncode() {
if (!frameQueueClosed) connection->doOutput();
- Mutex::ScopedLock l(frameQueueLock);
+ Mutex::ScopedLock l(frameQueueLock);
return (!isClient && !initialized) || !frameQueue.empty();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Sep 2 20:21:00 2008
@@ -4,7 +4,7 @@
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
-n * You may obtain a copy of the License at
+ * You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
@@ -20,14 +20,17 @@
#include "Connection.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/SessionState.h"
+#include "qpid/broker/Connection.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/ClusterJoinedBody.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/ClusterUrlNoticeBody.h"
+#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
+#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
-#include "qpid/framing/AMQP_AllOperations.h"
-#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/Invoker.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
@@ -38,36 +41,17 @@
namespace qpid {
namespace cluster {
-
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
-// Handle cluster controls from a given member.
-struct ClusterOperations : public framing::AMQP_AllOperations::ClusterHandler {
+struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
Cluster& cluster;
MemberId member;
-
- ClusterOperations(Cluster& c, const MemberId& m) : cluster(c), member(m) {}
-
- void joined(const std::string& url) {
- cluster.joined(member, url);
- }
+ ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {}
+ void urlNotice(const std::string& u) { cluster.urlNotice (member, u); }
+ bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
};
-
-ostream& operator <<(ostream& out, const Cluster& cluster) {
- return out << cluster.name.str() << "-" << cluster.self;
-}
-
-ostream& operator<<(ostream& out, const Cluster::UrlMap::value_type& m) {
- return out << m.first << " at " << m.second;
-}
-
-ostream& operator <<(ostream& out, const Cluster::UrlMap& urls) {
- ostream_iterator<Cluster::UrlMap::value_type> o(out, " ");
- copy(urls.begin(), urls.end(), o);
- return out;
-}
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
broker(&b),
@@ -80,30 +64,39 @@
boost::bind(&Cluster::dispatch, this, _1), // read
0, // write
boost::bind(&Cluster::disconnect, this, _1) // disconnect
- ),
- deliverQueue(boost::bind(&Cluster::deliverQueueCb, this, _1, _2)),
- mcastQueue(boost::bind(&Cluster::mcastQueueCb, this, _1, _2))
+ )
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
- QPID_LOG(trace, "Node " << self << " joining cluster: " << name_);
+ QPID_LOG(trace, "Joining cluster: " << name << " as " << self);
cpg.join(name);
- send(AMQFrame(in_place<ClusterJoinedBody>(ProtocolVersion(), url.str())), ConnectionId(self,0));
+ mcastFrame(AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())),
+ ConnectionId(self,0));
// Start dispatching from the poller.
cpgDispatchHandle.startWatch(poller);
- deliverQueue.start(poller);
- mcastQueue.start(poller);
}
Cluster::~Cluster() {}
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(lock);
+ connections.insert(ConnectionMap::value_type(ConnectionId(self, c.get()), c));
+}
+
+void Cluster::erase(ConnectionId id) {
+ Mutex::ScopedLock l(lock);
+ connections.erase(id);
+}
+
void Cluster::leave() {
Mutex::ScopedLock l(lock);
if (!broker) return; // Already left.
// Leave is called by from Broker destructor after the poller has
// been shut down. No dispatches can occur.
+
+ QPID_LOG(debug, "Leaving cluster " << name.str());
cpg.leave(name);
- // broker is set to 0 when the final config-change is delivered.
+ // broker= is set to 0 when the final config-change is delivered.
while(broker) {
Mutex::ScopedUnlock u(lock);
cpg.dispatchAll();
@@ -121,30 +114,30 @@
buf.putLongLong(value);
}
-void Cluster::send(const AMQFrame& frame, const ConnectionId& id) {
- QPID_LOG(trace, "MCAST [" << id << "] " << frame);
- mcastQueue.push(Message(frame, id));
-}
-
-void Cluster::mcastQueueCb(const MessageQueue::iterator& begin,
- const MessageQueue::iterator& end)
-{
- // Static is OK because there is only one cluster allowed per
- // process and only one thread in mcastQueueCb at a time.
- static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
+void Cluster::mcastFrame(const AMQFrame& frame, const ConnectionId& connection) {
+ QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
+ // FIXME aconway 2008-09-02: restore queueing.
+ Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking.
+ static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management or FrameEncoder.
Buffer buf(buffer, sizeof(buffer));
- for (MessageQueue::iterator i = begin; i != end; ++i) {
- AMQFrame& frame =i->first;
- ConnectionId id =i->second;
- if (buf.available() < frame.size() + sizeof(uint64_t))
- break;
- frame.encode(buf);
- encodePtr(buf, id.second);
- }
+ buf.putOctet(CONTROL);
+ encodePtr(buf, connection.getConnectionPtr());
+ frame.encode(buf);
iovec iov = { buffer, buf.getPosition() };
cpg.mcast(name, &iov, 1);
}
+void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) {
+ // FIXME aconway 2008-09-02: does this need locking?
+ Mutex::ScopedLock l(lock); // FIXME aconway 2008-09-02: review locking.
+ char hdrbuf[1+sizeof(uint64_t)];
+ Buffer buf(hdrbuf, sizeof(hdrbuf));
+ buf.putOctet(DATA);
+ encodePtr(buf, id.getConnectionPtr());
+ iovec iov[] = { { hdrbuf, buf.getPosition() }, { const_cast<char*>(data), size } };
+ cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
+}
+
size_t Cluster::size() const {
Mutex::ScopedLock l(lock);
return urls.size();
@@ -153,19 +146,23 @@
std::vector<Url> Cluster::getUrls() const {
Mutex::ScopedLock l(lock);
std::vector<Url> result(urls.size());
- std::transform(urls.begin(), urls.end(), result.begin(), boost::bind(&UrlMap::value_type::second, _1));
+ std::transform(urls.begin(), urls.end(), result.begin(),
+ boost::bind(&UrlMap::value_type::second, _1));
return result;
}
boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) {
- boost::intrusive_ptr<Connection> c = connections[id];
- if (!c && id.first != self) { // Shadow connection
- std::ostringstream os;
- os << id;
- c = connections[id] = new Connection(*this, shadowOut, os.str(), id);
+ if (id.getMember() == self)
+ return boost::intrusive_ptr<Connection>(id.getConnectionPtr());
+ ConnectionMap::iterator i = connections.find(id);
+ if (i == connections.end()) { // New shadow connection.
+ assert(id.getMember() != self);
+ std::ostringstream mgmtId;
+ mgmtId << name << ":" << id;
+ ConnectionMap::value_type value(id, new Connection(*this, shadowOut, mgmtId.str(), id));
+ i = connections.insert(value).first;
}
- assert(c);
- return c;
+ return i->second;
}
void Cluster::deliver(
@@ -176,17 +173,28 @@
void* msg,
int msg_len)
{
- MemberId from(nodeid, pid);
try {
+ MemberId from(nodeid, pid);
Buffer buf(static_cast<char*>(msg), msg_len);
- while (buf.available() > 0) {
+ Connection* connection;
+ uint8_t type = buf.getOctet();
+ decodePtr(buf, connection);
+ if (connection == 0) { // Cluster controls
AMQFrame frame;
- if (!frame.decode(buf)) // Not enough data.
- throw Exception("Received incomplete cluster event.");
- Connection* cp;
- decodePtr(buf, cp);
- QPID_LOG(critical, "deliverQ.push " << frame);
- deliverQueue.push(Message(frame, ConnectionId(from, cp)));
+ while (frame.decode(buf))
+ if (!ClusterOperations(*this, from).invoke(frame))
+ throw Exception("Invalid cluster control");
+ }
+ else { // Connection data or control
+ boost::intrusive_ptr<Connection> c =
+ getConnection(ConnectionId(from, connection));
+ if (type == DATA)
+ c->deliverBuffer(buf);
+ else {
+ AMQFrame frame;
+ while (frame.decode(buf))
+ c->deliver(frame);
+ }
}
}
catch (const std::exception& e) {
@@ -197,59 +205,24 @@
}
}
-void Cluster::deliverQueueCb(const MessageQueue::iterator& begin,
- const MessageQueue::iterator& end)
-{
- for (MessageQueue::iterator i = begin; i != end; ++i) {
- AMQFrame& frame(i->first);
- ConnectionId connectionId(i->second);
- try {
- QPID_LOG(trace, "DLVR [" << connectionId << "]: " << frame);
- if (!broker) {
- QPID_LOG(error, "Unexpected DLVR after leaving the cluster.");
- return;
- }
- if (connectionId.getConnectionPtr()) // Connection control
- getConnection(connectionId)->deliver(frame);
- else { // Cluster control
- ClusterOperations cops(*this, connectionId.getMember());
- bool invoked = framing::invoke(cops, *frame.getBody()).wasHandled();
- assert(invoked);
- }
- }
- catch (const std::exception& e) {
- // FIXME aconway 2008-01-30: exception handling.
- QPID_LOG(critical, "Error in cluster deliverQueueCb: " << e.what());
- assert(0);
- throw;
- }
- }
-}
-
-void Cluster::joined(const MemberId& member, const string& url) {
- Mutex::ScopedLock l(lock);
- QPID_LOG(debug, member << " has URL " << url);
- urls[member] = url;
- lock.notifyAll();
-}
-
void Cluster::configChange(
cpg_handle_t /*handle*/,
cpg_name */*group*/,
- cpg_address */*current*/, int /*nCurrent*/,
+ cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
- cpg_address *joined, int nJoined)
+ cpg_address */*joined*/, int /*nJoined*/)
{
- QPID_LOG(debug, "Cluster change: " << std::make_pair(joined, nJoined) << std::make_pair(left, nLeft));
+ QPID_LOG(debug, "Cluster change: "
+ << std::make_pair(current, nCurrent)
+ << std::make_pair(left, nLeft));
+
Mutex::ScopedLock l(lock);
- // We add URLs to the map in joined() we don't keep track of pre-URL members yet.
- for (int l = 0; l < nLeft; ++l) urls.erase(left[l]);
+ for (int i = 0; i < nLeft; ++i) urls.erase(left[i]);
+ // Add new members when their URL notice arraives.
- if (std::find(left, left+nLeft, self) != left+nLeft) {
+ if (std::find(left, left+nLeft, self) != left+nLeft)
broker = 0; // We have left the group, this is the final config change.
- QPID_LOG(debug, "Leaving cluster " << *this);
- }
- lock.notifyAll(); // Threads waiting for url changes.
+ lock.notifyAll(); // Threads waiting for membership changes.
}
void Cluster::dispatch(sys::DispatchHandle& h) {
@@ -263,14 +236,8 @@
broker->shutdown();
}
-void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(lock);
- connections[c->getId()] = c;
-}
-
-void Cluster::erase(ConnectionId id) {
- Mutex::ScopedLock l(lock);
- connections.erase(id);
+void Cluster::urlNotice(const MemberId& m, const std::string& url) {
+ urls.insert(UrlMap::value_type(m,Url(url)));
}
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Sep 2 20:21:00 2008
@@ -68,17 +68,18 @@
bool empty() const { return size() == 0; }
/** Send frame to the cluster */
- void send(const framing::AMQFrame&, const ConnectionId&);
+ void mcastFrame(const framing::AMQFrame&, const ConnectionId&);
+ void mcastBuffer(const char*, size_t, const ConnectionId&);
/** Leave the cluster */
void leave();
- void joined(const MemberId&, const std::string& url);
+ void urlNotice(const MemberId&, const std::string& url);
broker::Broker& getBroker() { assert(broker); return *broker; }
MemberId getSelf() const { return self; }
-
+
private:
typedef std::map<MemberId, Url> UrlMap;
typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
@@ -88,6 +89,11 @@
typedef PollableQueue<Message> MessageQueue;
boost::function<void()> shutdownNext;
+
+ /** Handle a delivered frame */
+ void deliverFrame(framing::AMQFrame&, const ConnectionId&);
+
+ void deliverBuffer(const char*, size_t, const ConnectionId&);
/** CPG deliver callback. */
void deliver(
@@ -107,15 +113,6 @@
struct cpg_address */*joined*/, int /*nJoined*/
);
- /** Callback to handle delivered frames from the deliverQueue. */
- void deliverQueueCb(const MessageQueue::iterator& begin,
- const MessageQueue::iterator& end);
-
- /** Callback to multi-cast frames from mcastQueue */
- void mcastQueueCb(const MessageQueue::iterator& begin,
- const MessageQueue::iterator& end);
-
-
/** Callback to dispatch CPG events. */
void dispatch(sys::DispatchHandle&);
/** Callback if CPG fd is disconnected. */
@@ -136,8 +133,6 @@
ConnectionMap connections;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
- MessageQueue deliverQueue;
- MessageQueue mcastQueue;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Sep 2 20:21:00 2008
@@ -46,16 +46,23 @@
Connection::~Connection() {}
-// Forward all received frames to the cluster, continue handling on delivery.
-void Connection::received(framing::AMQFrame& f) {
- cluster.send(f, self);
+void Connection::received(framing::AMQFrame& ) {
+ // FIXME aconway 2008-09-02: not called, codec sends straight to deliver
+ assert(0);
}
-// Don't doOutput in the
-bool Connection::doOutput() { return output.doOutput(); }
+bool Connection::doOutput() { return output.doOutput(); }
+
+// Delivery of doOutput allows us to run the real connection doOutput()
+// which stocks up the write buffers with data.
+//
+void Connection::deliverDoOutput(uint32_t requested) {
+ output.deliverDoOutput(requested);
+}
// Handle frames delivered from cluster.
void Connection::deliver(framing::AMQFrame& f) {
+ QPID_LOG(trace, "DLVR [" << self << "]: " << f);
// Handle connection controls, deliver other frames to connection.
if (!framing::invoke(*this, *f.getBody()).wasHandled())
connection.received(f);
@@ -71,7 +78,8 @@
// handler will be deleted.
//
connection.setOutputHandler(&discardHandler);
- cluster.send(AMQFrame(in_place<ClusterConnectionDeliverCloseBody>()), self);
+ cluster.mcastFrame(AMQFrame(in_place<ClusterConnectionDeliverCloseBody>()), self);
+ ++mcastSeq;
}
catch (const std::exception& e) {
QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
@@ -83,11 +91,19 @@
cluster.erase(self);
}
-// Delivery of doOutput allows us to run the real connection doOutput()
-// which stocks up the write buffers with data.
-//
-void Connection::deliverDoOutput(uint32_t requested) {
- output.deliverDoOutput(requested);
+size_t Connection::decode(const char* buffer, size_t size) {
+ QPID_LOG(trace, "mcastBuffer " << self << " " << mcastSeq << " " << size);
+ ++mcastSeq;
+ cluster.mcastBuffer(buffer, size, self);
+ // FIXME aconway 2008-09-01: deserialize?
+ return size;
+}
+
+void Connection::deliverBuffer(Buffer& buf) {
+ QPID_LOG(trace, "deliverBuffer " << self << " " << deliverSeq << " " << buf.available());
+ ++deliverSeq;
+ while (decoder.decode(buf))
+ deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread.
}
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Sep 2 20:21:00 2008
@@ -31,6 +31,8 @@
#include "qpid/amqp_0_10/Connection.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/framing/FrameDecoder.h"
+#include "qpid/framing/SequenceNumber.h"
namespace qpid {
@@ -56,16 +58,16 @@
~Connection();
ConnectionId getId() const { return self; }
+ broker::Connection& getBrokerConnection() { return connection; }
bool isLocal() const { return self.second == this; }
- // self-delivery of intercepted extension points.
+ Cluster& getCluster() { return cluster; }
+
+ // self-delivery of multicast data.
void deliver(framing::AMQFrame& f);
void deliverClose();
void deliverDoOutput(uint32_t requested);
-
- void codecDeleted();
-
- Cluster& getCluster() { return cluster; }
+ void deliverBuffer(framing::Buffer&);
// ConnectionOutputHandler methods
void close() {}
@@ -78,13 +80,15 @@
void closed();
bool doOutput();
bool hasOutput() { return connection.hasOutput(); }
- void idleOut() { idleOut(); }
- void idleIn() { idleIn(); }
+ void idleOut() { connection.idleOut(); }
+ void idleIn() { connection.idleIn(); }
+
+ // ConnectionCodec methods
+ size_t decode(const char* buffer, size_t size);
// ConnectionInputHandlerFactory
sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient);
- broker::Connection& getBrokerConnection() { return connection; }
private:
void sendDoOutput();
@@ -93,7 +97,10 @@
NoOpConnectionOutputHandler discardHandler;
WriteEstimate writeEstimate;
OutputInterceptor output;
+ framing::FrameDecoder decoder;
broker::Connection connection;
+ framing::SequenceNumber mcastSeq;
+ framing::SequenceNumber deliverSeq;
};
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Tue Sep 2 20:21:00 2008
@@ -22,6 +22,7 @@
#include "Connection.h"
#include "ProxyInputHandler.h"
#include "qpid/broker/Connection.h"
+#include "qpid/log/Statement.h"
#include "qpid/memory.h"
namespace qpid {
@@ -54,7 +55,11 @@
ConnectionCodec::~ConnectionCodec() {}
// ConnectionCodec functions delegate to the codecOutput
-size_t ConnectionCodec::decode(const char* buffer, size_t size) { return codec.decode(buffer, size); }
+size_t ConnectionCodec::decode(const char* buffer, size_t size) {
+ return interceptor->decode(buffer, size);
+}
+
+// FIXME aconway 2008-09-02: delegate to interceptor?
size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); }
bool ConnectionCodec::canEncode() { return codec.canEncode(); }
void ConnectionCodec::closed() { codec.closed(); }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Tue Sep 2 20:21:00 2008
@@ -58,7 +58,7 @@
ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c);
~ConnectionCodec();
- // ConnectionCodec functions delegate to the codecOutput
+ // ConnectionCodec functions.
size_t decode(const char* buffer, size_t size);
size_t encode(const char* buffer, size_t size);
bool canEncode();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Tue Sep 2 20:21:00 2008
@@ -183,10 +183,6 @@
return o << c.first << "-" << c.second;
}
-ostream& operator<<(ostream& o, const cpg_name& name) {
- return o << string(name.value, name.length);
-}
-
}} // namespace qpid::cluster
@@ -195,16 +191,18 @@
std::ostream& operator<<(std::ostream& o, const ::cpg_address& a) {
const char* reasonString;
switch (a.reason) {
- case CPG_REASON_JOIN: reasonString = "joined"; break;
- case CPG_REASON_LEAVE: reasonString = "left";break;
- case CPG_REASON_NODEDOWN: reasonString = "node-down";break;
- case CPG_REASON_NODEUP: reasonString = "node-up";break;
- case CPG_REASON_PROCDOWN: reasonString = "process-down";break;
- default:
- assert(0);
- reasonString = "";
+ case CPG_REASON_JOIN: reasonString = " joined"; break;
+ case CPG_REASON_LEAVE: reasonString = " left";break;
+ case CPG_REASON_NODEDOWN: reasonString = " node-down";break;
+ case CPG_REASON_NODEUP: reasonString = " node-up";break;
+ case CPG_REASON_PROCDOWN: reasonString = " process-down";break;
+ default: reasonString = "";
}
- return o << qpid::cluster::MemberId(a.nodeid, a.pid) << " " << reasonString;
+ return o << qpid::cluster::MemberId(a.nodeid, a.pid) << reasonString;
+}
+
+std::ostream& operator<<(std::ostream& o, const cpg_name& name) {
+ return o << std::string(name.value, name.length);
}
namespace std {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Tue Sep 2 20:21:00 2008
@@ -97,7 +97,7 @@
// Send it anyway to keep the doOutput chain going until we are sure there's no more output
// (in deliverDoOutput)
//
- parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDeliverDoOutputBody>(
+ parent.getCluster().mcastFrame(AMQFrame(in_place<ClusterConnectionDeliverDoOutputBody>(
framing::ProtocolVersion(), request)), parent.getId());
QPID_LOG(trace, &parent << "Send doOutput request for " << request);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Tue Sep 2 20:21:00 2008
@@ -34,6 +34,9 @@
class Connection;
+/** Types of cluster messages. */
+enum EventType { DATA, CONTROL };
+
/** first=node-id, second=pid */
struct MemberId : std::pair<uint32_t, uint32_t> {
MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {}
@@ -51,6 +54,7 @@
MemberId getMember() const { return first; }
Connection* getConnectionPtr() const { return second; }
};
+
std::ostream& operator<<(std::ostream&, const ConnectionId&);
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp Tue Sep 2 20:21:00 2008
@@ -45,6 +45,13 @@
return 12 /*frame header*/;
}
+uint16_t AMQFrame::DECODE_SIZE_MIN=4;
+
+uint16_t AMQFrame::decodeSize(char* data) {
+ Buffer buf(data+2, DECODE_SIZE_MIN);
+ return buf.getShort();
+}
+
void AMQFrame::encode(Buffer& buffer) const
{
//set track first (controls on track 0, everything else on 1):
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Tue Sep 2 20:21:00 2008
@@ -104,7 +104,10 @@
bool getEos() const { return eos; }
void setEos(bool isEos) { eos = isEos; }
+ static uint16_t DECODE_SIZE_MIN;
static uint32_t frameOverhead();
+ /** Must point to at least DECODE_SIZE_MIN bytes of data */
+ static uint16_t decodeSize(char* data);
private:
void init() { bof = eof = bos = eos = true; subchannel=0; channel=0; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h Tue Sep 2 20:21:00 2008
@@ -74,6 +74,7 @@
uint32_t getSize() { return size; }
uint32_t getPosition() { return position; }
Iterator getIterator() { return Iterator(*this); }
+ char* getPointer() { return data; }
void putOctet(uint8_t i);
void putShort(uint16_t i);
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp?rev=691489&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp Tue Sep 2 20:21:00 2008
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "FrameDecoder.h"
+#include "Buffer.h"
+#include "qpid/log/Statement.h"
+#include <algorithm>
+
+namespace qpid {
+namespace framing {
+
+namespace {
+/** Move up to n bytes from start of buf to end of bytes. */
+void move(std::vector<char>& bytes, Buffer& buffer, size_t n) {
+ size_t oldSize = bytes.size();
+ n = std::min(n, size_t(buffer.available()));
+ bytes.resize(oldSize+n);
+ char* p = &bytes[oldSize];
+ buffer.getRawData(reinterpret_cast<uint8_t*>(p), n);
+}
+}
+
+bool FrameDecoder::decode(Buffer& buffer) {
+ if (buffer.available() == 0) return false;
+ if (fragment.empty()) {
+ if (frame.decode(buffer)) // Decode from buffer
+ return true;
+ else // Store fragment
+ move(fragment, buffer, buffer.available());
+ }
+ else { // Already have a fragment
+ // Get enough data to decode the frame size.
+ if (fragment.size() < AMQFrame::DECODE_SIZE_MIN) {
+ move(fragment, buffer, AMQFrame::DECODE_SIZE_MIN - fragment.size());
+ }
+ if (fragment.size() >= AMQFrame::DECODE_SIZE_MIN) {
+ uint16_t size = AMQFrame::decodeSize(&fragment[0]);
+ assert(size > fragment.size());
+ move(fragment, buffer, size-fragment.size());
+ Buffer b(&fragment[0], fragment.size());
+ if (frame.decode(b)) {
+ assert(b.available() == 0);
+ fragment.clear();
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+}} // namespace qpid::framing
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h?rev=691489&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h Tue Sep 2 20:21:00 2008
@@ -0,0 +1,44 @@
+#ifndef QPID_FRAMING_FRAMEDECODER_H
+#define QPID_FRAMING_FRAMEDECODER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AMQFrame.h"
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Decode a frame from buffer. If buffer does not contain a complete
+ * frame, caches the fragment for the next call to decode.
+ */
+class FrameDecoder
+{
+ public:
+ bool decode(Buffer& buffer);
+ AMQFrame frame;
+ private:
+ std::vector<char> fragment;
+};
+}} // namespace qpid::framing
+
+#endif /*!QPID_FRAMING_FRAMEDECODER_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp Tue Sep 2 20:21:00 2008
@@ -193,9 +193,10 @@
}
{
- CPG related errors - seem benign but should invesgitate.
+ CPG error - seems benign.
Memcheck:Param
socketcall.sendmsg(msg.msg_iov[i])
- fun:sendmsg
- obj:/usr/lib/openais/libcpg.so.2.0.0
+ obj:*
+ obj:*/libcpg.so.2.0.0
}
+
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Tue Sep 2 20:21:00 2008
@@ -85,7 +85,10 @@
::close(pipeFds[1]);
FILE* f = ::fdopen(pipeFds[0], "r");
if (!f) throw ErrnoException("fopen failed");
- if (::fscanf(f, "%d", &port) != 1) throw ErrnoException("ill-formatted port");
+ if (::fscanf(f, "%d", &port) != 1) {
+ if (ferror(f)) throw ErrnoException("Error reading port number from child.");
+ else throw qpid::Exception("EOF reading port number from child.");
+ }
}
else { // child
::close(pipeFds[0]);
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Sep 2 20:21:00 2008
@@ -27,6 +27,7 @@
#include "qpid/client/Connection.h"
#include "qpid/client/Session.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/log/Logger.h"
#include <boost/bind.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
@@ -87,7 +88,7 @@
void ClusterFixture::add() {
std::ostringstream os;
- os << "broker" << size();
+ os << "fork" << size();
std::string prefix = os.str();
const char* argv[] = {
@@ -105,6 +106,7 @@
}
else { // First broker, run in this process.
Broker::Options opts;
+ qpid::log::Logger::instance().setPrefix("main");
Plugin::addOptions(opts); // Pick up cluster options.
opts.parse(argc, argv, "", true); // Allow-unknown for --load-module
broker0.reset(new BrokerFixture(opts));
@@ -144,7 +146,8 @@
ClusterFixture cluster(1);
Client c(cluster[0]);
BOOST_CHECK(c.session.queueQuery("q").getQueue().empty());
- BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty());
+ BOOST_CHECK(c.session.exchangeQuery("ex").getType().empty());
+ // FIXME aconway 2008-09-01: leaks if aisexec not running, investigate.
}
QPID_AUTO_TEST_CASE(testWiringReplication) {
Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=691489&r1=691488&r2=691489&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Tue Sep 2 20:21:00 2008
@@ -1,4 +1,4 @@
-<?xml version="1.0"?>
+o<?xml version="1.0"?>
<!--
-
- Licensed to the Apache Software Foundation (ASF) under one
@@ -25,8 +25,8 @@
<class name = "cluster" code = "0x80" label="Qpid clustering extensions.">
<doc>Qpid extension class to allow clustered brokers to communicate.</doc>
- <control name = "joined" code="0x1">
- <field name="url" type="str16" />
+ <control name = "url-notice" code="0x1">
+ <field name="url-notice" type="str16" />
</control>
</class>