You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/03/09 00:52:35 UTC
svn commit: r751557 - in /qpid/trunk/qpid/cpp: src/ src/qpid/cluster/
src/qpid/framing/ xml/
Author: aconway
Date: Sun Mar 8 23:52:35 2009
New Revision: 751557
URL: http://svn.apache.org/viewvc?rev=751557&view=rev
Log:
Fixed race conditions in cluster.
Execute all cluster logic in frameDeliverQueue thread,
decoding only in eventDeliverQueue thread.
Added:
qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp (with props)
qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h (with props)
qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h (with props)
Removed:
qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
Modified:
qpid/trunk/qpid/cpp/src/cluster.mk
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Sun Mar 8 23:52:35 2009
@@ -40,6 +40,8 @@
$(CMAN_SOURCES) \
qpid/cluster/Cluster.cpp \
qpid/cluster/Cluster.h \
+ qpid/cluster/Decoder.cpp \
+ qpid/cluster/Decoder.h \
qpid/cluster/PollableQueue.h \
qpid/cluster/ClusterMap.cpp \
qpid/cluster/ClusterMap.h \
@@ -49,8 +51,6 @@
qpid/cluster/Connection.h \
qpid/cluster/ConnectionCodec.cpp \
qpid/cluster/ConnectionCodec.h \
- qpid/cluster/ConnectionMap.h \
- qpid/cluster/ConnectionMap.cpp \
qpid/cluster/Cpg.cpp \
qpid/cluster/Cpg.h \
qpid/cluster/Dispatchable.h \
@@ -65,6 +65,7 @@
qpid/cluster/FailoverExchange.cpp \
qpid/cluster/FailoverExchange.h \
qpid/cluster/UpdateExchange.h \
+ qpid/cluster/LockedConnectionMap.h \
qpid/cluster/Multicaster.cpp \
qpid/cluster/Multicaster.h \
qpid/cluster/McastFrameHandler.h \
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Sun Mar 8 23:52:35 2009
@@ -107,11 +107,11 @@
boost::bind(&Cluster::leave, this),
"Error delivering frames",
poller),
- connections(*this),
- frameId(0),
initialized(false),
+ decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
+ discarding(true),
state(INIT),
- eventId(0),
+ frameId(0),
lastSize(0),
lastBroker(false)
{
@@ -156,14 +156,19 @@
// Called in connection thread to insert a client connection.
void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
- connections.insert(c);
+ localConnections.insert(c);
}
// Called in connection thread to insert an updated shadow connection.
void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
- connections.insert(c);
+ // Safe to use connections here because we're pre-catchup, either
+ // discarding or stalled, so deliveredFrame is not processing any
+ // connection events.
+ assert(discarding);
+ connections.insert(ConnectionMap::value_type(c->getId(), c));
}
+// Called by Connection::deliverClose() in deliverFrameQueue thread.
void Cluster::erase(const ConnectionId& id) {
connections.erase(id);
}
@@ -195,7 +200,6 @@
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
- connections.clear();
try { broker.shutdown(); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
@@ -217,58 +221,89 @@
Event e(Event::decodeCopy(from, buf));
if (from == self) // Record self-deliveries for flow control.
mcast.selfDeliver(e);
- deliver(e);
+ deliverEvent(e);
}
-void Cluster::deliver(const Event& e) {
+void Cluster::deliverEvent(const Event& e) {
deliverEventQueue.push(e);
}
+void Cluster::deliverFrame(const EventFrame& e) {
+ deliverFrameQueue.push(e);
+}
+
// Handler for deliverEventQueue.
-// This thread executes cluster controls and decodes connection data events.
-void Cluster::deliveredEvent(const Event& event) {
- Event e(event);
- Mutex::ScopedLock l(lock);
- if (state >= CATCHUP) {
- e.setId(++eventId);
+// This thread decodes frames from events.
+void Cluster::deliveredEvent(const Event& e) {
QPID_LOG(trace, *this << " DLVR: " << e);
- }
- if (e.isCluster()) { // Cluster control, process in this thread.
+ if (e.isCluster()) {
EventFrame ef(e, e.getFrame());
- QPID_LOG(trace, *this << " DLVR: " << ef);
- ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l);
- if (!framing::invoke(dispatch, *ef.frame.getBody()).wasHandled())
- throw Exception(QPID_MSG("Invalid cluster control"));
- }
- else if (state >= CATCHUP) { // Handle connection frames
- if (e.getType() == CONTROL)
- connectionFrame(EventFrame(e, e.getFrame()));
+ // Stop the deliverEventQueue on update offers.
+ // This preserves the connection decoder fragments for an update.
+ ClusterUpdateOfferBody* offer = dynamic_cast<ClusterUpdateOfferBody*>(ef.frame.getBody());
+ if (offer)
+ deliverEventQueue.stop();
+ deliverFrame(ef);
+ }
+ else if(!discarding) {
+ if (e.isControl())
+ deliverFrame(EventFrame(e, e.getFrame()));
else
- connections.decode(e, e.getData());
- }
- // Drop connection frames while state < CATCHUP
+ decoder.decode(e, e.getData());
}
-
-void Cluster::connectionFrame(const EventFrame& frame) {
- deliverFrameQueue.push(frame);
+ else // Discard connection events if discarding is set.
+ QPID_LOG(trace, *this << " DROP: " << e);
}
// Handler for deliverFrameQueue.
-// This thread executes connection control and data frames.
-void Cluster::deliveredFrame(const EventFrame& event) {
- // No lock, only use connections, not Cluster state.
- EventFrame e(event);
- if(!e.frame.getBody()) { // marks the stall point, start the update task.
- updateThread=Thread(*updateTask);
+// This thread executes the main logic.
+void Cluster::deliveredFrame(const EventFrame& e) {
+ Mutex::ScopedLock l(lock);
+ if (e.isCluster()) {
+ QPID_LOG(trace, *this << " DLVR: " << e);
+ ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
+ if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
+ throw Exception(QPID_MSG("Invalid cluster control"));
}
- else {
+ else if (state >= CATCHUP) {
QPID_LOG(trace, *this << " DLVR: " << e);
- if (e.type == DATA) // Add cluster-id to to data frames.
- e.frame.setClusterId(frameId++);
- boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
+ EventFrame ef(e); // Non-const copy
+ if (ef.type == DATA) // Add cluster-id to to data frames.
+ ef.frame.setClusterId(frameId++);
+ ConnectionPtr connection = getConnection(e.connectionId, l);
if (connection)
connection->deliveredFrame(e);
}
+ else // Drop connection frames while state < CATCHUP
+ QPID_LOG(trace, *this << " DROP: " << e);
+}
+
+// Called in deliverFrameQueue thread
+ConnectionPtr Cluster::getConnection(const ConnectionId& id, Lock&) {
+ ConnectionPtr cp;
+ ConnectionMap::iterator i = connections.find(id);
+ if (i != connections.end())
+ cp = i->second;
+ else {
+ if(id.getMember() == self)
+ cp = localConnections.getErase(id);
+ else {
+ // New remote connection, create a shadow.
+ std::ostringstream mgmtId;
+ mgmtId << id;
+ cp = new Connection(*this, shadowOut, mgmtId.str(), id);
+ }
+ if (cp)
+ connections.insert(ConnectionMap::value_type(id, cp));
+ }
+ return cp;
+}
+
+Cluster::ConnectionVector Cluster::getConnections(Lock&) {
+ ConnectionVector result(connections.size());
+ std::transform(connections.begin(), connections.end(), result.begin(),
+ boost::bind(&ConnectionMap::value_type::second, _1));
+ return result;
}
struct AddrList {
@@ -316,7 +351,7 @@
std::string addresses;
for (cpg_address* p = current; p < current+nCurrent; ++p)
addresses.append(MemberId(*p).str());
- deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
+ deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
}
void Cluster::setReady(Lock&) {
@@ -337,6 +372,7 @@
if (state == INIT) { // First configChange
if (map.aliveCount() == 1) {
setClusterId(true, l);
+ discarding = false;
setReady(l);
map = ClusterMap(self, myUrl, true);
memberUpdate(l);
@@ -396,28 +432,18 @@
}
}
-void Cluster::stall(Lock&) {
- // Stop processing the deliveredEventQueue in order to send or
- // recieve an update.
- deliverEventQueue.stop();
-}
-
-void Cluster::unstall(Lock&) {
- // Stop processing the deliveredEventQueue in order to send or
- // recieve an update.
- deliverEventQueue.start();
-}
-
void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
+ // NOTE: deliverEventQueue has been stopped at the update offer by
+ // deliveredEvent in case an update is required.
if (state == LEFT) return;
MemberId updatee(updateeInt);
boost::optional<Url> url = map.updateOffer(updater, updatee);
if (updater == self) {
assert(state == OFFER);
- if (url) { // My offer was first.
+ if (url) // My offer was first.
updateStart(updatee, *url, l);
- }
else { // Another offer was first.
+ deliverEventQueue.start(); // Don't need to update
setReady(l);
QPID_LOG(info, *this << " cancelled update offer to " << updatee);
makeOffer(map.firstJoiner(), l); // Maybe make another offer.
@@ -428,50 +454,48 @@
setClusterId(uuid, l);
state = UPDATEE;
QPID_LOG(info, *this << " receiving update from " << updater);
- stall(l);
checkUpdateIn(l);
}
+ else
+ deliverEventQueue.start(); // Don't need to update
}
void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
+ // NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent.
if (state == LEFT) return;
assert(state == OFFER);
state = UPDATER;
- QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
- stall(l);
-
+ QPID_LOG(info, *this << " sending update to " << updatee << " at " << url);
if (updateThread.id())
updateThread.join(); // Join the previous updateThread to avoid leaks.
client::ConnectionSettings cs;
cs.username = settings.username;
cs.password = settings.password;
cs.mechanism = settings.mechanism;
- updateTask = new UpdateClient(self, updatee, url, broker, map, eventId, frameId, connections.values(),
+ updateThread = Thread(
+ new UpdateClient(self, updatee, url, broker, map, frameId, getConnections(l), decoder,
boost::bind(&Cluster::updateOutDone, this),
boost::bind(&Cluster::updateOutError, this, _1),
- cs);
- // Push an empty frame onto the deliverFrameQueue to mark the stall point.
- // The deliverFrameQueue thread will start the update at that point.
- deliverFrameQueue.push(EventFrame(EventHeader(), AMQFrame()));
+ cs));
}
// Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m, uint64_t eventId_, uint64_t frameId_) {
+void Cluster::updateInDone(const ClusterMap& m, uint64_t frameId_) {
Lock l(lock);
updatedMap = m;
- eventId = eventId_;
- // Safe to use frameId here because we are stalled: deliveredFrame cannot be called concurrently.
+ // Safe to set frameId here because we are stalled: deliveredFrame cannot be called concurrently.
frameId = frameId_;
checkUpdateIn(l);
}
-void Cluster::checkUpdateIn(Lock& l) {
+void Cluster::checkUpdateIn(Lock&) {
if (state == UPDATEE && updatedMap) {
map = *updatedMap;
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
+ discarding = false; // ok to set, we're stalled for update.
QPID_LOG(info, *this << " received update, starting catch-up");
- unstall(l);
+ deliverEventQueue.start();
}
}
@@ -485,7 +509,7 @@
assert(state == UPDATER);
state = READY;
mcast.release();
- unstall(l);
+ deliverEventQueue.start(); // Start processing events again.
makeOffer(map.firstJoiner(), l); // Try another offer
}
@@ -569,15 +593,13 @@
mgmtObject->set_memberIDs(idstr);
}
- // Generate a deliver-close control frame for connections
- // belonging to defunct members, so they will be erased in the
- // deliverFrameQueue thread.
- ConnectionMap::Vector c = connections.values();
- for (ConnectionMap::Vector::iterator i = c.begin(); i != c.end(); ++i) {
- ConnectionId cid = (*i)->getId();
- MemberId mid = cid.getMember();
- if (mid != self && !map.isMember(mid))
- connectionFrame(EventFrame(EventHeader(CONTROL, cid), AMQFrame(ClusterConnectionDeliverCloseBody())));
+ // Erase connections belonging to members that have left the cluster.
+ ConnectionMap::iterator i = connections.begin();
+ while (i != connections.end()) {
+ ConnectionMap::iterator j = i++;
+ MemberId m = j->second->getId().getMember();
+ if (m != self && !map.isMember(m))
+ connections.erase(j);
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Sun Mar 8 23:52:35 2009
@@ -19,33 +19,34 @@
*
*/
-#include "ClusterSettings.h"
#include "ClusterMap.h"
-#include "ConnectionMap.h"
+#include "ClusterSettings.h"
#include "Cpg.h"
+#include "Decoder.h"
#include "Event.h"
+#include "EventFrame.h"
+#include "ExpiryPolicy.h"
#include "FailoverExchange.h"
+#include "LockedConnectionMap.h"
#include "Multicaster.h"
-#include "EventFrame.h"
#include "NoOpConnectionOutputHandler.h"
+#include "PollableQueue.h"
#include "PollerDispatch.h"
#include "Quorum.h"
-#include "PollableQueue.h"
-#include "ExpiryPolicy.h"
+#include "qmf/org/apache/qpid/cluster/Cluster.h"
+#include "qpid/Url.h"
#include "qpid/broker/Broker.h"
-#include "qpid/sys/Monitor.h"
#include "qpid/management/Manageable.h"
-#include "qpid/Url.h"
-#include "qmf/org/apache/qpid/cluster/Cluster.h"
+#include "qpid/sys/Monitor.h"
-#include <boost/intrusive_ptr.hpp>
#include <boost/bind.hpp>
+#include <boost/intrusive_ptr.hpp>
#include <boost/optional.hpp>
#include <algorithm>
-#include <vector>
#include <map>
+#include <vector>
namespace qpid {
@@ -57,6 +58,7 @@
namespace cluster {
class Connection;
+class EventFrame;
/**
* Connection to the cluster
@@ -64,7 +66,7 @@
class Cluster : private Cpg::Handler, public management::Manageable {
public:
typedef boost::intrusive_ptr<Connection> ConnectionPtr;
- typedef std::vector<ConnectionPtr> Connections;
+ typedef std::vector<ConnectionPtr> ConnectionVector;
// Public functions are thread safe unless otherwise mentioned in a comment.
@@ -90,7 +92,7 @@
void leave();
// Update completed - called in update thread
- void updateInDone(const ClusterMap&, uint64_t eventId, uint64_t frameId);
+ void updateInDone(const ClusterMap&, uint64_t frameId);
MemberId getId() const;
broker::Broker& getBroker() const;
@@ -101,15 +103,19 @@
size_t getReadMax() { return readMax; }
size_t getWriteEstimate() { return writeEstimate; }
- // Process a connection frame. Called by Connection with decoded frames.
- // Thread safety: only called in deliverEventQueue thread.
- void connectionFrame(const EventFrame&);
+ void deliverFrame(const EventFrame&);
+
+ // Called only during update by Connection::shadowReady
+ Decoder& getDecoder() { return decoder; }
private:
typedef sys::Monitor::ScopedLock Lock;
typedef PollableQueue<Event> PollableEventQueue;
typedef PollableQueue<EventFrame> PollableFrameQueue;
+ typedef std::map<ConnectionId, ConnectionPtr> ConnectionMap;
+
+ // FIXME aconway 2009-03-07: sort functions by thread
// NB: A dummy Lock& parameter marks functions that must only be
// called with Cluster::lock locked.
@@ -118,33 +124,33 @@
std::vector<std::string> getIds(Lock&) const;
std::vector<Url> getUrls(Lock&) const;
- // Make an offer if we can - called in deliver thread.
- void makeOffer(const MemberId&, Lock&);
-
- // Called in main thread from Broker destructor.
+ // == Called in main thread from Broker destructor.
void brokerShutdown();
+ // == Called in deliverEventQueue thread
+ void deliveredEvent(const Event&);
+
+ // == Called in deliverFrameQueue thread
+ void deliveredFrame(const EventFrame&);
+
// Cluster controls implement XML methods from cluster.xml.
- // Called in deliverEventQueue thread.
void updateRequest(const MemberId&, const std::string&, Lock&);
void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& addresses, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
void shutdown(const MemberId&, Lock&);
- // Helper, called by updateOffer.
- void updateStart(const MemberId& updatee, const Url& url, Lock&);
-
- // Used by cluster controls.
- void stall(Lock&);
- void unstall(Lock&);
-
- // Handlers for pollable queues.
- void deliveredEvent(const Event&);
- void deliveredFrame(const EventFrame&);
+ // Helper functions
+ ConnectionPtr getConnection(const ConnectionId&, Lock&);
+ ConnectionVector getConnections(Lock&);
+ void updateStart(const MemberId& updatee, const Url& url, Lock&);
+ void makeOffer(const MemberId&, Lock&);
void setReady(Lock&);
+ void memberUpdate(Lock&);
+ void setClusterId(const framing::Uuid&, Lock&);
+ // == Called in CPG dispatch thread
void deliver( // CPG deliver callback.
cpg_handle_t /*handle*/,
struct cpg_name *group,
@@ -153,7 +159,7 @@
void* /*msg*/,
int /*msg_len*/);
- void deliver(const Event&);
+ void deliverEvent(const Event&);
void configChange( // CPG config change callback.
cpg_handle_t /*handle*/,
@@ -163,23 +169,21 @@
struct cpg_address */*joined*/, int /*nJoined*/
);
+ // == Called in management threads.
virtual qpid::management::ManagementObject* GetManagementObject() const;
virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
void stopClusterNode(Lock&);
void stopFullCluster(Lock&);
- void memberUpdate(Lock&);
- // Called in connection IO threads .
+ // == Called in connection IO threads .
void checkUpdateIn(Lock&);
- // Called in UpdateClient thread.
+ // == Called in UpdateClient thread.
void updateOutDone();
void updateOutError(const std::exception&);
void updateOutDone(Lock&);
- void setClusterId(const framing::Uuid&, Lock&);
-
// Immutable members set on construction, never changed.
ClusterSettings settings;
broker::Broker& broker;
@@ -203,17 +207,23 @@
PollableFrameQueue deliverFrameQueue;
boost::shared_ptr<FailoverExchange> failoverExchange;
Quorum quorum;
- ConnectionMap connections;
-
- // Used only in deliverFrameQueue thread or in deliverEventQueue thread when stalled.
- uint64_t frameId;
+ LockedConnectionMap localConnections;
// Used only during initialization
bool initialized;
- // Remaining members are protected by lock
+ // Used only in deliverEventQueue thread or when stalled for update.
+ Decoder decoder;
+ bool discarding;
+
+ // Remaining members are protected by lock.
+ // FIXME aconway 2009-03-06: Most of these members are also only used in
+ // deliverFrameQueue thread or during stall. Review and separate members
+ // that require a lock, drop lock when not needed.
+ //
mutable sys::Monitor lock;
+
// Local cluster state, cluster map
enum {
INIT, ///< Initial state, no CPG messages received.
@@ -226,13 +236,13 @@
LEFT ///< Final state, left the cluster.
} state;
- uint64_t eventId;
+ ConnectionMap connections;
+ uint64_t frameId;
ClusterMap map;
ClusterMap::Set elders;
size_t lastSize;
bool lastBroker;
sys::Thread updateThread;
- sys::Runnable* updateTask;
boost::optional<ClusterMap> updatedMap;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Sun Mar 8 23:52:35 2009
@@ -150,7 +150,8 @@
void Connection::deliveredFrame(const EventFrame& f) {
assert(!catchUp);
currentChannel = f.frame.getChannel();
- if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
+ if (f.frame.getBody() // frame can be emtpy with just readCredit
+ && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
&& !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
{
if (f.type == DATA) // incoming data frames to broker::Connection
@@ -234,29 +235,6 @@
return size;
}
-// Decode a data event, a read buffer that has been delivered by the cluster.
-void Connection::decode(const EventHeader& eh, const void* data) {
- assert(eh.getType() == DATA); // Only handle connection data events.
- const char* cp = static_cast<const char*>(data);
- Buffer buf(const_cast<char*>(cp), eh.getSize());
- if (clusterDecoder.decode(buf)) { // Decoded a frame
- AMQFrame frame(clusterDecoder.getFrame());
- while (clusterDecoder.decode(buf)) {
- cluster.connectionFrame(EventFrame(eh, frame));
- frame = clusterDecoder.getFrame();
- }
- // Set read-credit on the last frame ending in this event.
- // Credit will be given when this frame is processed.
- cluster.connectionFrame(EventFrame(eh, frame, 1));
- }
- else {
- // We must give 1 unit read credit per event.
- // This event does not complete any frames so
- // we give read credit directly.
- giveReadCredit(1);
- }
-}
-
broker::SessionState& Connection::sessionState() {
return *connection.getChannel(currentChannel).getSession();
}
@@ -297,12 +275,13 @@
QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
self = shadowId;
connection.setUserId(username);
- clusterDecoder.setFragment(fragment.data(), fragment.size());
+ // OK to use decoder here because we are stalled for update.
+ cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
}
-void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t eventId, uint64_t frameId) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
- cluster.updateInDone(ClusterMap(joiners, members), eventId, frameId);
+ cluster.updateInDone(ClusterMap(joiners, members), frameId);
self.second = 0; // Mark this as completed update connection.
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Sun Mar 8 23:52:35 2009
@@ -34,8 +34,8 @@
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/framing/FrameDecoder.h"
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/FrameDecoder.h"
#include <iosfwd>
@@ -103,9 +103,6 @@
// ConnectionCodec methods - called by IO layer with a read buffer.
size_t decode(const char* buffer, size_t size);
- // Decode a data event, a read buffer that has been delivered by the cluster.
- void decode(const EventHeader& eh, const void* data);
-
// Called for data delivered from the cluster.
void deliveredFrame(const EventFrame&);
@@ -123,7 +120,7 @@
void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
- void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t eventId, uint64_t frameId);
+ void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId);
void deliveryRecord(const std::string& queue,
const framing::SequenceNumber& position,
@@ -153,8 +150,6 @@
void giveReadCredit(int credit);
- framing::FrameDecoder& getDecoder() { return clusterDecoder; }
-
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
@@ -179,7 +174,6 @@
WriteEstimate writeEstimate;
OutputInterceptor output;
framing::FrameDecoder localDecoder;
- framing::FrameDecoder clusterDecoder;
broker::Connection connection;
framing::SequenceNumber deliverSeq;
framing::ChannelId currentChannel;
Added: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp?rev=751557&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp Sun Mar 8 23:52:35 2009
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Decoder.h"
+#include "EventFrame.h"
+#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/AMQFrame.h"
+
+
+namespace qpid {
+namespace cluster {
+
+void Decoder::decode(const EventHeader& eh, const char* data) {
+ assert(eh.getType() == DATA); // Only handle connection data events.
+ const char* cp = static_cast<const char*>(data);
+ framing::Buffer buf(const_cast<char*>(cp), eh.getSize());
+ framing::FrameDecoder& decoder = map[eh.getConnectionId()];
+ if (decoder.decode(buf)) { // Decoded a frame
+ framing::AMQFrame frame(decoder.getFrame());
+ while (decoder.decode(buf)) {
+ process(EventFrame(eh, frame));
+ frame = decoder.getFrame();
+ }
+ // Set read-credit on the last frame ending in this event.
+ // Credit will be given when this frame is processed.
+ process(EventFrame(eh, frame, 1));
+ }
+ else {
+ // We must give 1 unit read credit per event.
+ // This event does not complete any frames so
+ // send an empty frame with the read credit.
+ process(EventFrame(EventHeader(), framing::AMQFrame(), 1));
+ }
+}
+
+void Decoder::process(const EventFrame& ef) {
+ if (ef.frame.getMethod() && ef.frame.getMethod()->isA<framing::ClusterConnectionDeliverCloseBody>())
+ map.erase(ef.connectionId);
+ callback(ef);
+}
+
+}} // namespace qpid::cluster
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h?rev=751557&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h Sun Mar 8 23:52:35 2009
@@ -0,0 +1,57 @@
+#ifndef QPID_CLUSTER_DECODER_H
+#define QPID_CLUSTER_DECODER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/framing/FrameDecoder.h"
+#include <boost/function.hpp>
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+class EventFrame;
+class EventHeader;
+
+/**
+ * A map of decoders for connections.
+ */
+class Decoder
+{
+ public:
+ typedef boost::function<void(const EventFrame&)> FrameHandler;
+
+ Decoder(FrameHandler fh) : callback(fh) {}
+ void decode(const EventHeader& eh, const char* data);
+ void erase(const ConnectionId&);
+ framing::FrameDecoder& get(const ConnectionId& c) { return map[c]; }
+
+ private:
+ typedef std::map<ConnectionId, framing::FrameDecoder> Map;
+ Map map;
+ void process(const EventFrame&);
+ FrameHandler callback;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_DECODER_H*/
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Sun Mar 8 23:52:35 2009
@@ -44,7 +44,7 @@
;
EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s)
- : type(t), connectionId(c), size(s), id(0) {}
+ : type(t), connectionId(c), size(s) {}
Event::Event() {}
@@ -128,7 +128,7 @@
}
std::ostream& operator << (std::ostream& o, const EventHeader& e) {
- o << "Event[id=" << e.getId() << " connection=" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]";
+ o << "Event[" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]";
return o;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Sun Mar 8 23:52:35 2009
@@ -57,11 +57,9 @@
/** Size of header + payload. */
size_t getStoreSize() { return size + HEADER_SIZE; }
- uint64_t getId() const { return id; }
- void setId(uint64_t n) { id = n; }
-
bool isCluster() const { return connectionId.getNumber() == 0; }
bool isConnection() const { return connectionId.getNumber() != 0; }
+ bool isControl() const { return type == CONTROL; }
protected:
static const size_t HEADER_SIZE;
@@ -69,7 +67,6 @@
EventType type;
ConnectionId connectionId;
size_t size;
- uint64_t id;
};
/**
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp Sun Mar 8 23:52:35 2009
@@ -24,16 +24,16 @@
namespace qpid {
namespace cluster {
-EventFrame::EventFrame() : eventId(0) {}
+EventFrame::EventFrame() {}
EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc)
- : connectionId(e.getConnectionId()), frame(f), eventId(e.getId()), readCredit(rc), type(e.getType())
+ : connectionId(e.getConnectionId()), frame(f), readCredit(rc), type(e.getType())
{
QPID_LATENCY_INIT(frame);
}
std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
- return o << e.frame << "(from event " << e.eventId << " read-credit=" << e.readCredit << ")";
+ return o << e.frame << " " << e.type << " " << e.connectionId << " read-credit=" << e.readCredit;
}
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Sun Mar 8 23:52:35 2009
@@ -47,16 +47,8 @@
bool isLastInEvent() const { return readCredit; }
- // True if this frame follows immediately after frame e.
- bool follows(const EventFrame& e) const {
- return eventId == e.eventId || (eventId == e.eventId+1 && e.readCredit);
- }
-
- bool operator<(const EventFrame& e) const { return eventId < e.eventId; }
-
ConnectionId connectionId;
framing::AMQFrame frame;
- uint64_t eventId;
int readCredit; ///< last frame in an event, give credit when processed.
EventType type;
};
Added: qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h?rev=751557&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h Sun Mar 8 23:52:35 2009
@@ -0,0 +1,62 @@
+#ifndef QPID_CLUSTER_LOCKEDCONNECTIONMAP_H
+#define QPID_CLUSTER_LOCKEDCONNECTIONMAP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/sys/Mutex.h"
+#include "Connection.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Thread safe map of connections.
+ */
+class LockedConnectionMap
+{
+ public:
+ void insert(const ConnectionPtr& c) {
+ sys::Mutex::ScopedLock l(lock);
+ map[c->getId()] = c;
+ }
+
+ ConnectionPtr getErase(const ConnectionId& c) {
+ sys::Mutex::ScopedLock l(lock);
+ Map::iterator i = map.find(c);
+ if (i != map.end()) {
+ ConnectionPtr cp = i->second;
+ map.erase(i);
+ return cp;
+ }
+ else
+ return 0;
+ }
+
+ private:
+ typedef std::map<ConnectionId, ConnectionPtr> Map;
+ mutable sys::Mutex lock;
+ Map map;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_LOCKEDCONNECTIONMAP_H*/
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Sun Mar 8 23:52:35 2009
@@ -22,6 +22,7 @@
#include "Cluster.h"
#include "ClusterMap.h"
#include "Connection.h"
+#include "Decoder.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/client/ConnectionAccess.h"
#include "qpid/broker/Broker.h"
@@ -86,14 +87,14 @@
// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
- broker::Broker& broker, const ClusterMap& m, uint64_t eventId_, uint64_t frameId_,
- const Cluster::Connections& cons,
+ broker::Broker& broker, const ClusterMap& m, uint64_t frameId_,
+ const Cluster::ConnectionVector& cons, Decoder& decoder_,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail,
const client::ConnectionSettings& cs
)
: updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
- eventId(eventId_), frameId(frameId_), connections(cons),
+ frameId(frameId_), connections(cons), decoder(decoder_),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail), connectionSettings(cs)
{
@@ -130,7 +131,6 @@
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
- membership.setEventId(eventId);
membership.setFrameId(frameId);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
@@ -232,7 +232,8 @@
connectionSettings.maxFrameSize = bc.getFrameMax();
shadowConnection.open(updateeUrl, connectionSettings);
bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
- std::pair<const char*, size_t> fragment = updateConnection->getDecoder().getFragment();
+ // Safe to use decoder here because we are stalled for update.
+ std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment();
ClusterConnectionProxy(shadowConnection).shadowReady(
updateConnection->getId().getMember(),
updateConnection->getId().getNumber(),
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Sun Mar 8 23:52:35 2009
@@ -46,6 +46,7 @@
class DeliveryRecord;
class SessionState;
class SemanticState;
+class Decoder;
} // namespace broker
@@ -54,6 +55,7 @@
class Cluster;
class Connection;
class ClusterMap;
+class Decoder;
/**
* A client that updates the contents of a local broker to a remote one using AMQP.
@@ -63,8 +65,8 @@
static const std::string UPDATE; // Name for special update queue and exchange.
UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
- broker::Broker& donor, const ClusterMap& map, uint64_t eventId, uint64_t frameId,
- const std::vector<boost::intrusive_ptr<Connection> >& ,
+ broker::Broker& donor, const ClusterMap& map, uint64_t frameId,
+ const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&,
const boost::function<void()>& done,
const boost::function<void(const std::exception&)>& fail,
const client::ConnectionSettings&
@@ -92,9 +94,9 @@
Url updateeUrl;
broker::Broker& updaterBroker;
ClusterMap map;
- uint64_t eventId;
uint64_t frameId;
std::vector<boost::intrusive_ptr<Connection> > connections;
+ Decoder& decoder;
client::Connection connection, shadowConnection;
client::AsyncSession session, shadowSession;
boost::function<void()> done;
Modified: qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp Sun Mar 8 23:52:35 2009
@@ -35,13 +35,14 @@
subchannel=0;
channel=0;
encodedSizeCache = 0;
+ clusterId = 0;
}
AMQFrame::AMQFrame(const boost::intrusive_ptr<AMQBody>& b) : body(b) { init(); }
AMQFrame::AMQFrame(const AMQBody& b) : body(b.clone()) { init(); }
-AMQFrame::~AMQFrame() {}
+AMQFrame::~AMQFrame() { init(); }
AMQBody* AMQFrame::getBody() {
// Non-const AMQBody* may be used to modify the body.
Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=751557&r1=751556&r2=751557&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Sun Mar 8 23:52:35 2009
@@ -132,7 +132,6 @@
<control name="membership" code="0x21" label="Cluster membership details.">
<field name="joiners" type="map"/> <!-- member-id -> URL -->
<field name="members" type="map"/> <!-- member-id -> state -->
- <field name="event-id" type="uint64"/>> <!-- Event id counter value -->
<field name="frame-id" type="uint64"/>> <!-- Frame id counter value -->
</control>
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org