You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/03/03 00:30:09 UTC
svn commit: r749473 - in /qpid/trunk/qpid/cpp: src/ src/qpid/cluster/
src/qpid/framing/ src/tests/ xml/
Author: aconway
Date: Mon Mar 2 23:30:08 2009
New Revision: 749473
URL: http://svn.apache.org/viewvc?rev=749473&view=rev
Log:
Replicate connection decoder fragments to new members.
Refactoring:
- Merge Decoder into ConnectionMap.
- Process cluster controls in event queue thread.
- Use counter not pointer for connection ID, avoid re-use.
- Do all processing in event queue thread to avoid races
(temporary pending performance measurements)
Removed:
qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h
Modified:
qpid/trunk/qpid/cpp/src/cluster.mk
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp
qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h
qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp
qpid/trunk/qpid/cpp/src/tests/FrameDecoder.cpp
qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Mon Mar 2 23:30:08 2009
@@ -53,10 +53,6 @@
qpid/cluster/ConnectionMap.cpp \
qpid/cluster/Cpg.cpp \
qpid/cluster/Cpg.h \
- qpid/cluster/Decoder.cpp \
- qpid/cluster/Decoder.h \
- qpid/cluster/ConnectionDecoder.cpp \
- qpid/cluster/ConnectionDecoder.h \
qpid/cluster/Dispatchable.h \
qpid/cluster/UpdateClient.cpp \
qpid/cluster/UpdateClient.h \
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Mar 2 23:30:08 2009
@@ -22,6 +22,7 @@
#include "UpdateClient.h"
#include "FailoverExchange.h"
+#include "qpid/assert.h"
#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
#include "qmf/org/apache/qpid/cluster/Package.h"
#include "qpid/broker/Broker.h"
@@ -91,7 +92,7 @@
cpg(*this),
name(settings.name),
myUrl(settings.url.empty() ? Url() : Url(settings.url)),
- myId(cpg.self()),
+ self(cpg.self()),
readMax(settings.readMax),
writeEstimate(settings.writeEstimate),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
@@ -104,8 +105,7 @@
boost::bind(&Cluster::leave, this),
"Error delivering frames",
poller),
- decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1), connections),
- expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())),
+ expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, self, broker.getTimer())),
frameId(0),
initialized(false),
state(INIT),
@@ -213,7 +213,7 @@
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
e.setSequence(sequence++);
- if (from == myId) // Record self-deliveries for flow control.
+ if (from == self) // Record self-deliveries for flow control.
mcast.selfDeliver(e);
deliver(e);
}
@@ -227,42 +227,33 @@
// Handler for deliverEventQueue
void Cluster::deliveredEvent(const Event& e) {
QPID_LATENCY_RECORD("delivered event queue", e);
- Buffer buf(const_cast<char*>(e.getData()), e.getSize());
- if (e.getType() == CONTROL) {
- AMQFrame frame;
- while (frame.decode(buf)) {
- // Check for deliver close here so we can erase the
- // connection decoder safely in this thread.
- if (frame.getMethod()->isA<ClusterConnectionDeliverCloseBody>())
- decoder.erase(e.getConnectionId());
- deliverFrameQueue.push(EventFrame(e, frame));
- }
+ Mutex::ScopedLock l(lock);
+ if (e.isCluster()) { // Cluster control, process in this thread.
+ AMQFrame frame(e.getFrame());
+ ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l);
+ if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
+ throw Exception(QPID_MSG("Invalid cluster control"));
}
- else if (e.getType() == DATA)
- decoder.decode(e, e.getData());
+ else if (state >= CATCHUP) { // Connection frame, push onto deliver queue.
+ if (e.getType() == CONTROL)
+ connectionFrame(EventFrame(e, e.getFrame()));
+ else
+ connections.decode(e, e.getData());
+ }
+ else // connection frame && state < CATCHUP. Drop.
+ QPID_LOG(trace, *this << " DROP: " << e);
}
// Handler for deliverFrameQueue
void Cluster::deliveredFrame(const EventFrame& e) {
- Mutex::ScopedLock l(lock);
- const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
+ Mutex::ScopedLock l(lock); // TODO aconway 2009-03-02: don't need this lock?
+ assert(!e.isCluster()); // Only connection frames on this queue.
QPID_LOG(trace, *this << " DLVR: " << e);
- QPID_LATENCY_RECORD("delivered frame queue", e.frame);
- if (e.isCluster()) { // Cluster control frame
- ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
- if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
- throw Exception(QPID_MSG("Invalid cluster control"));
- }
- else { // Connection frame.
- if (state <= UPDATEE) {
- QPID_LOG(trace, *this << " DROP: " << e);
- return;
- }
- boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
- if (connection) // Ignore frames to closed local connections.
- connection->deliveredFrame(e);
- }
- QPID_LATENCY_RECORD("processed", e.frame);
+ if (e.type == DATA) // Sequence number to identify data frames.
+ const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
+ boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
+ if (connection) // Ignore frames to closed local connections.
+ connection->deliveredFrame(e);
}
struct AddrList {
@@ -310,7 +301,7 @@
std::string addresses;
for (cpg_address* p = current; p < current+nCurrent; ++p)
addresses.append(MemberId(*p).str());
- deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId));
+ deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
}
void Cluster::setReady(Lock&) {
@@ -323,7 +314,7 @@
bool memberChange = map.configChange(addresses);
if (state == LEFT) return;
- if (!map.isAlive(myId)) { // Final config change.
+ if (!map.isAlive(self)) { // Final config change.
leave(l);
return;
}
@@ -332,16 +323,16 @@
if (map.aliveCount() == 1) {
setClusterId(true);
setReady(l);
- map = ClusterMap(myId, myUrl, true);
+ map = ClusterMap(self, myUrl, true);
memberUpdate(l);
QPID_LOG(notice, *this << " first in cluster");
}
else { // Joining established group.
state = JOINER;
QPID_LOG(info, *this << " joining cluster: " << map);
- mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId);
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
elders = map.getAlive();
- elders.erase(myId);
+ elders.erase(self);
broker.getLinks().setPassive(true);
}
}
@@ -361,7 +352,7 @@
if (state == READY && map.isJoiner(id)) {
state = OFFER;
QPID_LOG(info, *this << " send update-offer to " << id);
- mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), myId);
+ mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), self);
}
}
@@ -388,17 +379,29 @@
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
if (map.ready(id, Url(url)))
memberUpdate(l);
- if (state == CATCHUP && id == myId) {
+ if (state == CATCHUP && id == self) {
setReady(l);
QPID_LOG(notice, *this << " caught up, active cluster member");
}
}
+void Cluster::stall(Lock&) {
+ // Stop processing the deliveredEventQueue in order to send or
+ // recieve an update.
+ deliverEventQueue.stop();
+}
+
+void Cluster::unstall(Lock&) {
+ // Stop processing the deliveredEventQueue in order to send or
+ // recieve an update.
+ deliverEventQueue.start();
+}
+
void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
if (state == LEFT) return;
MemberId updatee(updateeInt);
boost::optional<Url> url = map.updateOffer(updater, updatee);
- if (updater == myId) {
+ if (updater == self) {
assert(state == OFFER);
if (url) { // My offer was first.
updateStart(updatee, *url, l);
@@ -409,29 +412,29 @@
makeOffer(map.firstJoiner(), l); // Maybe make another offer.
}
}
- else if (updatee == myId && url) {
+ else if (updatee == self && url) {
assert(state == JOINER);
setClusterId(uuid);
state = UPDATEE;
QPID_LOG(info, *this << " receiving update from " << updater);
- deliverFrameQueue.stop();
+ stall(l);
checkUpdateIn(l);
}
}
-void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
+void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
if (state == LEFT) return;
assert(state == OFFER);
state = UPDATER;
QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
- deliverFrameQueue.stop();
+ stall(l);
if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
client::ConnectionSettings cs;
cs.username = settings.username;
cs.password = settings.password;
cs.mechanism = settings.mechanism;
updateThread = Thread(
- new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(),
+ new UpdateClient(self, updatee, url, broker, map, frameId, connections.values(),
boost::bind(&Cluster::updateOutDone, this),
boost::bind(&Cluster::updateOutError, this, _1),
cs));
@@ -445,13 +448,13 @@
checkUpdateIn(l);
}
-void Cluster::checkUpdateIn(Lock& ) {
+void Cluster::checkUpdateIn(Lock& l) {
if (state == UPDATEE && updatedMap) {
map = *updatedMap;
- mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
+ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
QPID_LOG(info, *this << " received update, starting catch-up");
- deliverFrameQueue.start();
+ unstall(l);
}
}
@@ -465,7 +468,7 @@
assert(state == UPDATER);
state = READY;
mcast.release();
- deliverFrameQueue.start();
+ unstall(l);
makeOffer(map.firstJoiner(), l); // Try another offer
}
@@ -490,7 +493,7 @@
{
_qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args;
stringstream stream;
- stream << myId;
+ stream << self;
if (iargs.i_brokerId == stream.str())
stopClusterNode(l);
}
@@ -511,7 +514,7 @@
void Cluster::stopFullCluster(Lock& ) {
QPID_LOG(notice, *this << " shutting down cluster " << name);
- mcast.mcastControl(ClusterShutdownBody(), myId);
+ mcast.mcastControl(ClusterShutdownBody(), self);
}
void Cluster::memberUpdate(Lock& l) {
@@ -522,12 +525,12 @@
failoverExchange->setUrls(urls);
if (size == 1 && lastSize > 1 && state >= CATCHUP) {
- QPID_LOG(info, *this << " last broker standing, update queue policies");
+ QPID_LOG(notice, *this << " last broker standing, update queue policies");
lastBroker = true;
broker.getQueues().updateQueueClusterState(true);
}
else if (size > 1 && lastBroker) {
- QPID_LOG(info, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
+ QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
lastBroker = false;
broker.getQueues().updateQueueClusterState(false);
}
@@ -549,17 +552,25 @@
mgmtObject->set_memberIDs(idstr);
}
- // Close connections belonging to members that have now been excluded
- connections.update(myId, map);
+ // Generate a deliver-close control frame for connections
+ // belonging to defunct members, so they will be erased in the
+ // deliverFrameQueue thread.
+ ConnectionMap::Vector c = connections.values();
+ for (ConnectionMap::Vector::iterator i = c.begin(); i != c.end(); ++i) {
+ ConnectionId cid = (*i)->getId();
+ MemberId mid = cid.getMember();
+ if (mid != self && !map.isMember(mid))
+ connectionFrame(EventFrame(EventHeader(CONTROL, cid), AMQFrame(ClusterConnectionDeliverCloseBody())));
+ }
}
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
- return o << cluster.myId << "(" << STATE[cluster.state] << ")";
+ return o << cluster.self << "(" << STATE[cluster.state] << ")";
}
MemberId Cluster::getId() const {
- return myId; // Immutable, no need to lock.
+ return self; // Immutable, no need to lock.
}
broker::Broker& Cluster::getBroker() const {
@@ -578,7 +589,7 @@
clusterId = uuid;
if (mgmtObject) {
stringstream stream;
- stream << myId;
+ stream << self;
mgmtObject->set_clusterID(clusterId.str());
mgmtObject->set_memberID(stream.str());
}
@@ -589,4 +600,11 @@
expiryPolicy->deliverExpire(id);
}
+void Cluster::connectionFrame(const EventFrame& frame) {
+ // FIXME aconway 2009-03-02: bypassing deliverFrameQueue to avoid race condition.
+ // Measure performance impact, restore with better locking.
+ // deliverFrameQueue.push(frame);
+ deliveredFrame(frame);
+}
+
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Mar 2 23:30:08 2009
@@ -30,7 +30,6 @@
#include "NoOpConnectionOutputHandler.h"
#include "PollerDispatch.h"
#include "Quorum.h"
-#include "Decoder.h"
#include "PollableQueue.h"
#include "ExpiryPolicy.h"
@@ -102,7 +101,10 @@
size_t getWriteEstimate() { return writeEstimate; }
bool isLeader() const; // Called in deliver thread.
-
+
+ // Called by Connection in deliver event thread with decoded connection data frames.
+ void connectionFrame(const EventFrame&);
+
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -125,7 +127,7 @@
void brokerShutdown();
// Cluster controls implement XML methods from cluster.xml.
- // Called in deliver thread.
+ // Called in deliveredEvent thread.
//
void updateRequest(const MemberId&, const std::string&, Lock&);
void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
@@ -134,6 +136,10 @@
void messageExpired(const MemberId&, uint64_t, Lock& l);
void shutdown(const MemberId&, Lock&);
+ // Used by cluster controls.
+ void stall(Lock&);
+ void unstall(Lock&);
+
// Handlers for pollable queues.
void deliveredEvent(const Event&);
void deliveredFrame(const EventFrame&);
@@ -141,6 +147,10 @@
// Helper, called in deliver thread.
void updateStart(const MemberId& updatee, const Url& url, Lock&);
+ // Called in event deliver thread to check for update status.
+ bool isUpdateComplete(const EventFrame&);
+ bool isUpdateComplete();
+
void setReady(Lock&);
void deliver( // CPG deliver callback.
@@ -186,7 +196,7 @@
Cpg cpg;
const std::string name;
Url myUrl;
- const MemberId myId;
+ const MemberId self;
const size_t readMax;
const size_t writeEstimate;
framing::Uuid clusterId;
@@ -201,9 +211,6 @@
boost::shared_ptr<FailoverExchange> failoverExchange;
Quorum quorum;
- // Used only in deliverdEvent thread
- Decoder decoder;
-
// Used only in deliveredFrame thread
ClusterMap::Set elders;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Mon Mar 2 23:30:08 2009
@@ -40,6 +40,7 @@
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/LatencyMetric.h"
+#include "qpid/sys/AtomicValue.h"
#include <boost/current_function.hpp>
@@ -58,19 +59,22 @@
NoOpConnectionOutputHandler Connection::discardHandler;
-// Shadow connections
-Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& wrappedId, ConnectionId myId)
- : cluster(c), self(myId), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false),
+namespace {
+sys::AtomicValue<uint64_t> idCounter;
+}
+
+// Shadow connection
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id)
+ : cluster(c), self(id), catchUp(false), output(*this, out),
+ connection(&output, cluster.getBroker(), logId), expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
-// Local connections
+// Local connection
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
- : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0),
+ const std::string& logId, MemberId member, bool isCatchUp, bool isLink)
+ : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
+ connection(&output, cluster.getBroker(), logId, isLink, catchUp ? ++catchUpId : 0),
expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
@@ -149,12 +153,9 @@
if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
&& !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
{
- // FIXME aconway 2009-02-24: Using the DATA/CONTROL
- // distinction to distinguish incoming vs. outgoing frames is
- // very unclear.
if (f.type == DATA) // incoming data frames to broker::Connection
connection.received(const_cast<AMQFrame&>(f.frame));
- else { // outgoing data frame, send via SessionState
+ else { // frame control, send frame via SessionState
broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
}
@@ -200,12 +201,12 @@
connection.closed();
}
-// Decode data from local clients.
+// ConnectoinCodec::decode receives read buffers from directly-connected clients.
size_t Connection::decode(const char* buffer, size_t size) {
if (catchUp) { // Handle catch-up locally.
Buffer buf(const_cast<char*>(buffer), size);
while (localDecoder.decode(buf))
- received(localDecoder.frame);
+ received(localDecoder.getFrame());
}
else { // Multicast local connections.
assert(isLocal());
@@ -233,6 +234,29 @@
return size;
}
+// Decode a data event, a read buffer that has been delivered by the cluster.
+void Connection::decode(const EventHeader& eh, const void* data) {
+ assert(eh.getType() == DATA); // Only handle connection data events.
+ const char* cp = static_cast<const char*>(data);
+ Buffer buf(const_cast<char*>(cp), eh.getSize());
+ if (clusterDecoder.decode(buf)) { // Decoded a frame
+ AMQFrame frame(clusterDecoder.getFrame());
+ while (clusterDecoder.decode(buf)) {
+ cluster.connectionFrame(EventFrame(eh, frame));
+ frame = clusterDecoder.getFrame();
+ }
+ // Set read-credit on the last frame ending in this event.
+ // Credit will be given when this frame is processed.
+ cluster.connectionFrame(EventFrame(eh, frame, 1));
+ }
+ else {
+ // We must give 1 unit read credit per event.
+ // This event does not complete any frames so
+ // we give read credit directly.
+ giveReadCredit(1);
+ }
+}
+
broker::SessionState& Connection::sessionState() {
return *connection.getChannel(currentChannel).getSession();
}
@@ -267,11 +291,12 @@
QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
}
-void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username) {
- ConnectionId shadow = ConnectionId(memberId, connectionId);
- QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow);
- self = shadow;
+void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment) {
+ ConnectionId shadowId = ConnectionId(memberId, connectionId);
+ QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
+ self = shadowId;
connection.setUserId(username);
+ clusterDecoder.setFragment(fragment.data(), fragment.size());
}
void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) {
@@ -281,7 +306,7 @@
}
bool Connection::isLocal() const {
- return self.first == cluster.getId() && self.second == this;
+ return self.first == cluster.getId() && self.second;
}
bool Connection::isShadow() const {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Mon Mar 2 23:30:08 2009
@@ -64,10 +64,10 @@
public:
typedef sys::PollableQueue<EventFrame> PollableFrameQueue;
- /** Local connection, use this in ConnectionId */
- Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp, bool isLink);
- /** Shadow connection */
- Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId);
+ /** Local connection. */
+ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink);
+ /** Shadow connection. */
+ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id);
~Connection();
ConnectionId getId() const { return self; }
@@ -100,9 +100,12 @@
/** Called if the connectors member has left the cluster */
void left();
- // ConnectionCodec methods
+ // ConnectionCodec methods - called by IO layer with a read buffer.
size_t decode(const char* buffer, size_t size);
+ // Decode a data event, a read buffer that has been delivered by the cluster.
+ void decode(const EventHeader& eh, const void* data);
+
// Called for data delivered from the cluster.
void deliveredFrame(const EventFrame&);
@@ -118,7 +121,7 @@
const framing::SequenceNumber& received,
const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
- void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username);
+ void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId);
@@ -149,7 +152,9 @@
void exchange(const std::string& encoded);
void giveReadCredit(int credit);
-
+
+ framing::FrameDecoder& getDecoder() { return clusterDecoder; }
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
@@ -174,6 +179,7 @@
WriteEstimate writeEstimate;
OutputInterceptor output;
framing::FrameDecoder localDecoder;
+ framing::FrameDecoder clusterDecoder;
broker::Connection connection;
framing::SequenceNumber deliverSeq;
framing::ChannelId currentChannel;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Mon Mar 2 23:30:08 2009
@@ -46,16 +46,13 @@
// Used for outgoing Link connections, we don't care.
sys::ConnectionCodec*
-ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) {
- return new ConnectionCodec(out, id, cluster, false, true);
- //return next->create(out, id);
+ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId) {
+ return new ConnectionCodec(out, logId, cluster, false, true);
}
-ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp, bool isLink)
- : codec(out, id, isLink),
- interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp, isLink)),
- id(interceptor->getId()),
- localId(id)
+ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& cluster, bool catchUp, bool isLink)
+ : codec(out, logId, isLink),
+ interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink))
{
std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
codec.setInputHandler(ih);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Mon Mar 2 23:30:08 2009
@@ -56,7 +56,7 @@
sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
};
- ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp, bool isLink);
+ ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& c, bool catchUp, bool isLink);
~ConnectionCodec();
// ConnectionCodec functions.
@@ -71,8 +71,6 @@
private:
amqp_0_10::Connection codec;
boost::intrusive_ptr<cluster::Connection> interceptor;
- cluster::ConnectionId id;
- std::string localId;
};
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp Mon Mar 2 23:30:08 2009
@@ -38,9 +38,9 @@
void ConnectionMap::erase(const ConnectionId& id) {
Lock l(lock);
- Map::iterator i = map.find(id);
- QPID_ASSERT(i != map.end());
- map.erase(i);
+ size_t erased = map.erase(id);
+ assert(erased);
+ (void)erased; // Avoid unused variable warnings.
}
ConnectionMap::ConnectionPtr ConnectionMap::get(const ConnectionId& id) {
@@ -61,13 +61,6 @@
return i->second;
}
-ConnectionMap::ConnectionPtr ConnectionMap::getLocal(const ConnectionId& id) {
- Lock l(lock);
- if (id.getMember() != cluster.getId()) return 0;
- Map::const_iterator i = map.find(id);
- return i == map.end() ? 0 : i->second;
-}
-
ConnectionMap::Vector ConnectionMap::values() const {
Lock l(lock);
Vector result(map.size());
@@ -76,22 +69,16 @@
return result;
}
-void ConnectionMap::update(MemberId myId, const ClusterMap& cluster) {
- Lock l(lock);
- for (Map::iterator i = map.begin(); i != map.end(); ) {
- MemberId member = i->first.getMember();
- if (member != myId && !cluster.isMember(member)) {
- i->second->left();
- map.erase(i++);
- } else {
- i++;
- }
- }
-}
-
void ConnectionMap::clear() {
Lock l(lock);
map.clear();
}
+void ConnectionMap::decode(const EventHeader& eh, const void* data) {
+ ConnectionPtr connection = get(eh.getConnectionId());
+ if (connection)
+ connection->decode(eh, data);
+}
+
+
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h Mon Mar 2 23:30:08 2009
@@ -60,18 +60,13 @@
*/
ConnectionPtr get(const ConnectionId& id);
- /** If ID is a local connection and in the map return it, else return 0 */
- ConnectionPtr getLocal(const ConnectionId& id);
-
/** Get connections for sending an update. */
Vector values() const;
- /** Remove connections who's members are no longer in the cluster. Deliver thread. */
- void update(MemberId myId, const ClusterMap& cluster);
+ /** Decode a connection data event. */
+ void decode(const EventHeader& eh, const void* data);
-
void clear();
-
size_t size() const;
private:
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Mon Mar 2 23:30:08 2009
@@ -23,6 +23,7 @@
#include "Cpg.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/assert.h"
#include <ostream>
#include <iterator>
#include <algorithm>
@@ -31,6 +32,7 @@
namespace cluster {
using framing::Buffer;
+using framing::AMQFrame;
const size_t EventHeader::HEADER_SIZE =
sizeof(uint8_t) + // type
@@ -57,7 +59,7 @@
type = (EventType)buf.getOctet();
if(type != DATA && type != CONTROL)
throw Exception("Invalid multicast event type");
- connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong()));
+ connectionId = ConnectionId(m, buf.getLongLong());
size = buf.getLong();
#ifdef QPID_LATENCY_METRIC
latency_metric_timestamp = buf.getLongLong();
@@ -93,7 +95,7 @@
void EventHeader::encode(Buffer& b) const {
b.putOctet(type);
- b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer()));
+ b.putLongLong(connectionId.getNumber());
b.putLong(size);
#ifdef QPID_LATENCY_METRIC
b.putLongLong(latency_metric_timestamp);
@@ -111,6 +113,14 @@
return Buffer(const_cast<char*>(getData()), getSize());
}
+AMQFrame Event::getFrame() const {
+ assert(type == CONTROL);
+ Buffer buf(*this);
+ AMQFrame frame;
+ QPID_ASSERT(frame.decode(buf));
+ return frame;
+}
+
static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
std::ostream& operator << (std::ostream& o, EventType t) {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Mon Mar 2 23:30:08 2009
@@ -24,6 +24,7 @@
#include "types.h"
#include "qpid/RefCountedBuffer.h"
+#include "qpid/framing/AMQFrame.h"
#include "qpid/sys/LatencyMetric.h"
#include <sys/uio.h> // For iovec
#include <iosfwd>
@@ -59,8 +60,8 @@
uint64_t getSequence() const { return sequence; }
void setSequence(uint64_t n) { sequence = n; }
- bool isCluster() const { return connectionId.getPointer() == 0; }
- bool isConnection() const { return connectionId.getPointer() != 0; }
+ bool isCluster() const { return connectionId.getNumber() == 0; }
+ bool isConnection() const { return connectionId.getNumber() != 0; }
protected:
static const size_t HEADER_SIZE;
@@ -97,6 +98,8 @@
// Store including header
char* getStore() { return store; }
const char* getStore() const { return store; }
+
+ framing::AMQFrame getFrame() const;
operator framing::Buffer() const;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Mon Mar 2 23:30:08 2009
@@ -42,8 +42,8 @@
EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc=0);
- bool isCluster() const { return !connectionId.getPointer(); }
- bool isConnection() const { return connectionId.getPointer(); }
+ bool isCluster() const { return connectionId.getNumber() == 0; }
+ bool isConnection() const { return connectionId.getNumber() != 0; }
bool isLastInEvent() const { return readCredit; }
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Mon Mar 2 23:30:08 2009
@@ -95,7 +95,7 @@
: updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
frameId(frameId_), connections(cons),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
- done(ok), failed(fail)
+ done(ok), failed(fail), connectionSettings(cs)
{
connection.open(url, cs);
session = connection.newSession("update_shared");
@@ -228,13 +228,15 @@
shadowConnection = catchUpConnection();
broker::Connection& bc = updateConnection->getBrokerConnection();
- // FIXME aconway 2008-10-20: What authentication info to use on reconnect?
- shadowConnection.open(updateeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
+ connectionSettings.maxFrameSize = bc.getFrameMax();
+ shadowConnection.open(updateeUrl, connectionSettings);
bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
+ std::pair<const char*, size_t> fragment = updateConnection->getDecoder().getFragment();
ClusterConnectionProxy(shadowConnection).shadowReady(
updateConnection->getId().getMember(),
- reinterpret_cast<uint64_t>(updateConnection->getId().getPointer()),
- updateConnection->getBrokerConnection().getUserId()
+ updateConnection->getId().getNumber(),
+ bc.getUserId(),
+ string(fragment.first, fragment.second)
);
shadowConnection.close();
QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
@@ -285,9 +287,6 @@
if (inProgress) {
inProgress->getFrames().map(simpl->out);
}
-
- // FIXME aconway 2008-09-23: update session replay list.
-
QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Mon Mar 2 23:30:08 2009
@@ -98,6 +98,7 @@
client::AsyncSession session, shadowSession;
boost::function<void()> done;
boost::function<void(const std::exception& e)> failed;
+ client::ConnectionSettings connectionSettings;
};
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Mon Mar 2 23:30:08 2009
@@ -68,17 +68,16 @@
std::ostream& operator<<(std::ostream&, const MemberId&);
-struct ConnectionId : public std::pair<MemberId, Connection*> {
- ConnectionId(const MemberId& m=MemberId(), Connection* c=0) : std::pair<MemberId, Connection*> (m,c) {}
- ConnectionId(uint64_t m, uint64_t c)
- : std::pair<MemberId, Connection*>(MemberId(m), reinterpret_cast<Connection*>(c)) {}
+struct ConnectionId : public std::pair<MemberId, uint64_t> {
+ ConnectionId(const MemberId& m=MemberId(), uint64_t c=0) : std::pair<MemberId, uint64_t> (m,c) {}
+ ConnectionId(uint64_t m, uint64_t c) : std::pair<MemberId, uint64_t>(MemberId(m), c) {}
MemberId getMember() const { return first; }
- Connection* getPointer() const { return second; }
+ uint64_t getNumber() const { return second; }
};
std::ostream& operator<<(std::ostream&, const ConnectionId&);
-std::ostream& operator << (std::ostream&, EventType);
+std::ostream& operator<<(std::ostream&, EventType);
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.cpp Mon Mar 2 23:30:08 2009
@@ -21,8 +21,9 @@
#include "FrameDecoder.h"
#include "Buffer.h"
#include "qpid/log/Statement.h"
-#include <algorithm>
#include "qpid/framing/reply_exceptions.h"
+#include <algorithm>
+#include <string.h>
namespace qpid {
namespace framing {
@@ -67,4 +68,13 @@
return false;
}
+void FrameDecoder::setFragment(const char* data, size_t size) {
+ fragment.resize(size);
+ ::memcpy(fragment.data(), data, size);
+}
+
+std::pair<const char*, size_t> FrameDecoder::getFragment() const {
+ return std::pair<const char*, size_t>(fragment.data(), fragment.size());
+}
+
}} // namespace qpid::framing
Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/FrameDecoder.h Mon Mar 2 23:30:08 2009
@@ -35,9 +35,16 @@
{
public:
bool decode(Buffer& buffer);
- AMQFrame frame;
+ const AMQFrame& getFrame() const { return frame; }
+ AMQFrame& getFrame() { return frame; }
+
+ void setFragment(const char*, size_t);
+ std::pair<const char*, size_t> getFragment() const;
+
private:
std::vector<char> fragment;
+ AMQFrame frame;
+
};
}} // namespace qpid::framing
Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp Mon Mar 2 23:30:08 2009
@@ -109,7 +109,7 @@
Args args(makeArgs(prefix));
vector<const char*> argv(args.size());
transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1));
- qpid::log::Logger::instance().setPrefix(os.str());
+ qpid::log::Logger::instance().setPrefix(prefix);
localBroker.reset(new BrokerFixture(parseOpts(argv.size(), &argv[0])));
push_back(localBroker->getPort());
forkedBrokers.push_back(shared_ptr<ForkedBroker>());
Modified: qpid/trunk/qpid/cpp/src/tests/FrameDecoder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/FrameDecoder.cpp?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/FrameDecoder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/FrameDecoder.cpp Mon Mar 2 23:30:08 2009
@@ -65,7 +65,7 @@
}
Buffer buf(&encoded[encoded.size()-1], 1);
BOOST_CHECK(decoder.decode(buf));
- BOOST_CHECK_EQUAL(data, getData(decoder.frame));
+ BOOST_CHECK_EQUAL(data, getData(decoder.getFrame()));
}
Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=749473&r1=749472&r2=749473&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Mon Mar 2 23:30:08 2009
@@ -125,6 +125,7 @@
<field name="member-id" type="uint64"/>
<field name="connection-id" type="uint64"/>
<field name="user-name" type="str8"/>
+ <field name="fragment" type="str32"/>
</control>
<!-- Complete a cluster state update. -->
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org
Re: svn commit: r749473 - in /qpid/trunk/qpid/cpp: src/
src/qpid/cluster/ src/qpid/framing/ src/tests/ xml/
Posted by Aidan Skinner <ai...@gmail.com>.
On Tue, Mar 3, 2009 at 5:58 PM, Gordon Sim <gs...@redhat.com> wrote:
> Aidan Skinner wrote:
>>
>> Stil broken for me, I'm at 749627:
>
> Try 749669.
That fixed it, thanks! :)
- Aidan
--
Apache Qpid - World Domination through Advanced Message Queueing
http://qpid.apache.org
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org
Re: svn commit: r749473 - in /qpid/trunk/qpid/cpp: src/ src/qpid/cluster/
src/qpid/framing/ src/tests/ xml/
Posted by Gordon Sim <gs...@redhat.com>.
Aidan Skinner wrote:
> Stil broken for me, I'm at 749627:
Try 749669.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org
Re: svn commit: r749473 - in /qpid/trunk/qpid/cpp: src/
src/qpid/cluster/ src/qpid/framing/ src/tests/ xml/
Posted by Aidan Skinner <ai...@gmail.com>.
On Tue, Mar 3, 2009 at 4:54 PM, Gordon Sim <gs...@redhat.com> wrote:
> Aidan Skinner wrote:
>>
>> On Mon, Mar 2, 2009 at 11:30 PM, <ac...@apache.org> wrote:
>>
>>> Author: aconway
>>> Date: Mon Mar 2 23:30:08 2009
>>> New Revision: 749473
>>>
>>> URL: http://svn.apache.org/viewvc?rev=749473&view=rev
>>> Log:
>>>
>>> Replicate connection decoder fragments to new members.
>>
>> This seems to have broken the C++ build for me on RHEL4
>
> Should be fixed in 749621.
Stil broken for me, I'm at 749627:
qpid/framing/FrameDecoder.cpp: In member function `std::pair<const
char*, size_t> qpid::framing::FrameDecoder::getFragment() const':
qpid/framing/FrameDecoder.cpp:77: error: 'const class
std::vector<char, std::allocator<char> >' has no member named 'data'
- Aidan
--
Apache Qpid - World Domination through Advanced Message Queueing
http://qpid.apache.org
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org
Re: svn commit: r749473 - in /qpid/trunk/qpid/cpp: src/ src/qpid/cluster/
src/qpid/framing/ src/tests/ xml/
Posted by Gordon Sim <gs...@redhat.com>.
Aidan Skinner wrote:
> On Mon, Mar 2, 2009 at 11:30 PM, <ac...@apache.org> wrote:
>
>> Author: aconway
>> Date: Mon Mar 2 23:30:08 2009
>> New Revision: 749473
>>
>> URL: http://svn.apache.org/viewvc?rev=749473&view=rev
>> Log:
>>
>> Replicate connection decoder fragments to new members.
>
> This seems to have broken the C++ build for me on RHEL4
Should be fixed in 749621.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org
Re: svn commit: r749473 - in /qpid/trunk/qpid/cpp: src/
src/qpid/cluster/ src/qpid/framing/ src/tests/ xml/
Posted by Aidan Skinner <ai...@apache.org>.
On Mon, Mar 2, 2009 at 11:30 PM, <ac...@apache.org> wrote:
> Author: aconway
> Date: Mon Mar 2 23:30:08 2009
> New Revision: 749473
>
> URL: http://svn.apache.org/viewvc?rev=749473&view=rev
> Log:
>
> Replicate connection decoder fragments to new members.
This seems to have broken the C++ build for me on RHEL4, I get:
g++ -DHAVE_CONFIG_H -I. -I. -I. -Igen -I./gen -pedantic -Wall -Wextra
-Wno-shadow -Wpointer-arith -Wcast-qual -Wcast-align -Wno-long-long
-Winvalid-pch -Wno-system-headers -Woverloaded-virtual -g -O2 -MT
qpid/framing/FrameDecoder.lo -MD -MP -MF
qpid/framing/.deps/FrameDecoder.Tpo -c qpid/framing/FrameDecoder.cpp
-fPIC -DPIC -o qpid/framing/.libs/FrameDecoder.o
qpid/framing/FrameDecoder.cpp: In member function `void
qpid::framing::FrameDecoder::setFragment(const char*, size_t)':
qpid/framing/FrameDecoder.cpp:73: error: 'class std::vector<char,
std::allocator<char> >' has no member named 'data'
qpid/framing/FrameDecoder.cpp: In member function `std::pair<const
char*, size_t> qpid::framing::FrameDecoder::getFragment() const':
qpid/framing/FrameDecoder.cpp:77: error: 'const class
std::vector<char, std::allocator<char> >' has no member named 'data'
:(
- Aidan
--
Apache Qpid - World Domination through Advanced Message Queueing
http://qpid.apache.org
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org