You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/04/11 16:29:07 UTC
svn commit: r764204 - in /qpid/trunk/qpid/cpp: src/ src/qpid/amqp_0_10/
src/qpid/broker/ src/qpid/cluster/ src/tests/ xml/
Author: aconway
Date: Sat Apr 11 14:29:04 2009
New Revision: 764204
URL: http://svn.apache.org/viewvc?rev=764204&view=rev
Log:
Fix issues when cluster is run with persistence enabled.
- Handle partial failures (e.g. due to disk error): failing brokers shut down, others continue.
- Enable persistence in cluster tests.
- Correct message status in DeliveryRecord updates.
- Remove qpid.update queue when update complete - avoid it becoming persistent
Added:
qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (with props)
qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h (with props)
qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp (with props)
qpid/trunk/qpid/cpp/src/tests/test_store.cpp (with props)
Modified:
qpid/trunk/qpid/cpp/src/cluster.mk
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp
qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h
qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp
qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
qpid/trunk/qpid/cpp/src/tests/Makefile.am
qpid/trunk/qpid/cpp/src/tests/cluster.mk
qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
qpid/trunk/qpid/cpp/src/tests/clustered_replication_test
qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
qpid/trunk/qpid/cpp/src/tests/run_failover_soak
qpid/trunk/qpid/cpp/src/tests/start_cluster
qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Sat Apr 11 14:29:04 2009
@@ -56,6 +56,8 @@
qpid/cluster/Dispatchable.h \
qpid/cluster/UpdateClient.cpp \
qpid/cluster/UpdateClient.h \
+ qpid/cluster/ErrorCheck.cpp \
+ qpid/cluster/ErrorCheck.h \
qpid/cluster/Event.cpp \
qpid/cluster/Event.h \
qpid/cluster/EventFrame.h \
@@ -70,6 +72,7 @@
qpid/cluster/Multicaster.h \
qpid/cluster/McastFrameHandler.h \
qpid/cluster/NoOpConnectionOutputHandler.h \
+ qpid/cluster/StallConnectionOutputHandler.h \
qpid/cluster/OutputInterceptor.cpp \
qpid/cluster/OutputInterceptor.h \
qpid/cluster/PollerDispatch.cpp \
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Sat Apr 11 14:29:04 2009
@@ -88,6 +88,7 @@
}
catch(const SessionException& e) {
QPID_LOG(error, "Execution exception: " << e.what());
+ executionException(e.code, e.what()); // Let subclass handle this first.
framing::AMQP_AllProxy::Execution execution(channel);
AMQMethodBody* m = f.getMethod();
SequenceNumber commandId;
@@ -98,6 +99,7 @@
}
catch(const ChannelException& e){
QPID_LOG(error, "Channel exception: " << e.what());
+ channelException(e.code, e.what()); // Let subclass handle this first.
peer.detached(name, e.code);
}
catch(const ConnectionException& e) {
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h Sat Apr 11 14:29:04 2009
@@ -87,8 +87,9 @@
QPID_COMMON_EXTERN virtual void invoke(const framing::AMQMethodBody& m);
virtual void setState(const std::string& sessionName, bool force) = 0;
- virtual void channelException(framing::session::DetachCode code, const std::string& msg) = 0;
virtual void connectionException(framing::connection::CloseCode code, const std::string& msg) = 0;
+ virtual void channelException(framing::session::DetachCode, const std::string& msg) = 0;
+ virtual void executionException(framing::execution::ErrorCode, const std::string& msg) = 0;
virtual void detaching() = 0;
// Notification of events
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Sat Apr 11 14:29:04 2009
@@ -57,7 +57,8 @@
mgmtObject(0),
links(broker_.getLinks()),
agent(0),
- timer(broker_.getTimer())
+ timer(broker_.getTimer()),
+ errorListener(0)
{
Manageable* parent = broker.GetVhostObject();
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Sat Apr 11 14:29:04 2009
@@ -66,6 +66,17 @@
public RefCounted
{
public:
+ /**
+ * Listener that can be registered with a Connection to be informed of errors.
+ */
+ class ErrorListener
+ {
+ public:
+ virtual ~ErrorListener() {}
+ virtual void sessionError(uint16_t channel, const std::string&) = 0;
+ virtual void connectionError(const std::string&) = 0;
+ };
+
Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false, uint64_t objectId = 0);
~Connection ();
@@ -101,6 +112,9 @@
const std::string& getMgmtId() const { return mgmtId; }
management::ManagementAgent* getAgent() const { return agent; }
void setFederationLink(bool b);
+ /** Connection does not delete the listener. 0 resets. */
+ void setErrorListener(ErrorListener* l) { errorListener=l; }
+ ErrorListener* getErrorListener() { return errorListener; }
void setHeartbeatInterval(uint16_t heartbeat);
void sendHeartbeat();
@@ -112,6 +126,7 @@
void sendClose();
void setSecureConnection(SecureConnection* secured);
+
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
@@ -128,6 +143,8 @@
management::ManagementAgent* agent;
Timer& timer;
boost::intrusive_ptr<TimerTask> heartbeatTimer;
+ ErrorListener* errorListener;
+
public:
qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; }
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Sat Apr 11 14:29:04 2009
@@ -64,13 +64,16 @@
void ConnectionHandler::handle(framing::AMQFrame& frame)
{
AMQMethodBody* method=frame.getBody()->getMethod();
+ Connection::ErrorListener* errorListener = handler->connection.getErrorListener();
try{
if (!invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler.get()), *method)) {
handler->connection.getChannel(frame.getChannel()).in(frame);
}
}catch(ConnectionException& e){
+ if (errorListener) errorListener->connectionError(e.what());
handler->proxy.close(e.code, e.what());
}catch(std::exception& e){
+ if (errorListener) errorListener->connectionError(e.what());
handler->proxy.close(541/*internal error*/, e.what());
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Sat Apr 11 14:29:04 2009
@@ -45,14 +45,20 @@
MethodId methodId(AMQMethodBody* m) { return m ? m->amqpClassId() : 0; }
} // namespace
-void SessionHandler::channelException(framing::session::DetachCode, const std::string&) {
- handleDetach();
-}
-
void SessionHandler::connectionException(framing::connection::CloseCode code, const std::string& msg) {
+ // NOTE: must tell the error listener _before_ calling connection.close()
+ if (connection.getErrorListener()) connection.getErrorListener()->connectionError(msg);
connection.close(code, msg);
}
+void SessionHandler::channelException(framing::session::DetachCode, const std::string& msg) {
+ if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+}
+
+void SessionHandler::executionException(framing::execution::ErrorCode, const std::string& msg) {
+ if (connection.getErrorListener()) connection.getErrorListener()->sessionError(getChannel(), msg);
+}
+
ConnectionState& SessionHandler::getConnection() { return connection; }
const ConnectionState& SessionHandler::getConnection() const { return connection; }
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Sat Apr 11 14:29:04 2009
@@ -73,8 +73,9 @@
virtual void setState(const std::string& sessionName, bool force);
virtual qpid::SessionState* getState();
virtual framing::FrameHandler* getInHandler();
- virtual void channelException(framing::session::DetachCode code, const std::string& msg);
virtual void connectionException(framing::connection::CloseCode code, const std::string& msg);
+ virtual void channelException(framing::session::DetachCode, const std::string& msg);
+ virtual void executionException(framing::execution::ErrorCode, const std::string& msg);
virtual void detaching();
virtual void readyToSend();
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Sat Apr 11 14:29:04 2009
@@ -36,6 +36,7 @@
#include "qpid/framing/ClusterConfigChangeBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
+#include "qpid/framing/ClusterErrorCheckBody.h"
#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterUpdateOfferBody.h"
@@ -63,6 +64,7 @@
using namespace qpid::sys;
using namespace std;
using namespace qpid::cluster;
+using namespace qpid::framing::cluster;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
@@ -77,9 +79,10 @@
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
void ready(const std::string& url) { cluster.ready(member, url, l); }
- void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
+ void configChange(const std::string& current) { cluster.configChange(member, current, l); }
void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
+ void errorCheck(uint8_t type, uint64_t seq) { cluster.errorCheck(member, type, seq, l); }
void shutdown() { cluster.shutdown(member, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -112,7 +115,8 @@
discarding(true),
state(INIT),
lastSize(0),
- lastBroker(false)
+ lastBroker(false),
+ error(*this)
{
mAgent = ManagementAgent::Singleton::getInstance();
if (mAgent != 0){
@@ -195,14 +199,19 @@
leave(l);
}
+#define LEAVE_TRY(STMT) try { STMT; } \
+ catch (const std::exception& e) { \
+ QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
+ } do {} while(0)
+
void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
- try { broker.shutdown(); }
- catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
- }
+ // Finalize connections now now to avoid problems later in destructor.
+ LEAVE_TRY(localConnections.clear());
+ LEAVE_TRY(connections.clear());
+ LEAVE_TRY(broker.shutdown());
}
}
@@ -254,10 +263,22 @@
QPID_LOG(trace, *this << " DROP: " << e);
}
+void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
+ Mutex::ScopedLock l(lock);
+ error.error(connection, type, map.getFrameSeq(), map.getMembers());
+}
+
// Handler for deliverFrameQueue.
// This thread executes the main logic.
void Cluster::deliveredFrame(const EventFrame& e) {
Mutex::ScopedLock l(lock);
+ // Process each frame through the error checker.
+ error.delivered(e);
+ while (error.canProcess()) // There is a frame ready to process.
+ processFrame(error.getNext(), l);
+}
+
+void Cluster::processFrame(const EventFrame& e, Lock& l) {
if (e.isCluster()) {
QPID_LOG(trace, *this << " DLVR: " << e);
ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
@@ -265,7 +286,8 @@
throw Exception(QPID_MSG("Invalid cluster control"));
}
else if (state >= CATCHUP) {
- QPID_LOG(trace, *this << " DLVR: " << e);
+ map.incrementFrameSeq();
+ QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
ConnectionPtr connection = getConnection(e.connectionId, l);
if (connection)
connection->deliveredFrame(e);
@@ -357,8 +379,8 @@
broker.getQueueEvents().enable();
}
-void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) {
- bool memberChange = map.configChange(addresses);
+void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) {
+ bool memberChange = map.configChange(current);
if (state == LEFT) return;
if (!map.isAlive(self)) { // Final config change.
@@ -600,8 +622,13 @@
}
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
- static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
- return o << cluster.self << "(" << STATE[cluster.state] << ")";
+ static const char* STATE[] = {
+ "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
+ };
+ assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
+ o << cluster.self << "(" << STATE[cluster.state];
+ if (cluster.error.isUnresolved()) o << "/error";
+ return o << ")";
}
MemberId Cluster::getId() const {
@@ -635,4 +662,13 @@
expiryPolicy->deliverExpire(id);
}
+void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) {
+ // If we receive an errorCheck here, it's because we have processed past the point
+ // of the error so respond with ERROR_TYPE_NONE
+ assert(map.getFrameSeq() >= frameSeq);
+ if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE.
+ mcast.mcastControl(
+ ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
+}
+
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Sat Apr 11 14:29:04 2009
@@ -23,6 +23,7 @@
#include "ClusterSettings.h"
#include "Cpg.h"
#include "Decoder.h"
+#include "ErrorCheck.h"
#include "Event.h"
#include "EventFrame.h"
#include "ExpiryPolicy.h"
@@ -105,6 +106,10 @@
void deliverFrame(const EventFrame&);
+ // Called in deliverFrame thread to indicate an error from the broker.
+ void flagError(Connection&, ErrorCheck::ErrorType);
+ void connectionError();
+
// Called only during update by Connection::shadowReady
Decoder& getDecoder() { return decoder; }
@@ -132,13 +137,15 @@
// == Called in deliverFrameQueue thread
void deliveredFrame(const EventFrame&);
+ void processFrame(const EventFrame&, Lock&);
// Cluster controls implement XML methods from cluster.xml.
void updateRequest(const MemberId&, const std::string&, Lock&);
void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
- void configChange(const MemberId&, const std::string& addresses, Lock& l);
+ void configChange(const MemberId&, const std::string& current, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
+ void errorCheck(const MemberId&, uint8_t, uint64_t, Lock&);
void shutdown(const MemberId&, Lock&);
// Helper functions
@@ -216,11 +223,13 @@
Decoder decoder;
bool discarding;
+
// Remaining members are protected by lock.
- // FIXME aconway 2009-03-06: Most of these members are also only used in
+
+ // TODO aconway 2009-03-06: Most of these members are also only used in
// deliverFrameQueue thread or during stall. Review and separate members
// that require a lock, drop lock when not needed.
- //
+
mutable sys::Monitor lock;
@@ -243,7 +252,7 @@
bool lastBroker;
sys::Thread updateThread;
boost::optional<ClusterMap> updatedMap;
-
+ ErrorCheck error;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Sat Apr 11 14:29:04 2009
@@ -33,6 +33,13 @@
namespace cluster {
+ClusterMap::Set ClusterMap::decode(const std::string& s) {
+ Set set;
+ for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8)
+ set.insert(MemberId(std::string(i, i+8)));
+ return set;
+}
+
namespace {
void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& map, ClusterMap::Set& set) {
@@ -54,9 +61,9 @@
}
-ClusterMap::ClusterMap() {}
+ClusterMap::ClusterMap() : frameSeq(0) {}
-ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) {
+ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) : frameSeq(0) {
alive.insert(id);
if (isMember)
members[id] = url;
@@ -64,7 +71,9 @@
joiners[id] = url;
}
-ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt) {
+ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, uint64_t frameSeq_)
+ : frameSeq(frameSeq_)
+{
std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive)));
std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive)));
}
@@ -78,22 +87,7 @@
}
b.getMembers().clear();
std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1));
-}
-
-bool ClusterMap::configChange(
- cpg_address *current, int nCurrent,
- cpg_address *left, int nLeft,
- cpg_address */*joined*/, int /*nJoined*/)
-{
- cpg_address* a;
- bool memberChange=false;
- for (a = left; a != left+nLeft; ++a) {
- memberChange = memberChange || members.erase(*a);
- joiners.erase(*a);
- }
- alive.clear();
- std::copy(current, current+nCurrent, std::inserter(alive, alive.end()));
- return memberChange;
+ b.setFrameSeq(frameSeq);
}
Url ClusterMap::getUrl(const Map& map, const MemberId& id) {
@@ -123,8 +117,13 @@
return urls;
}
-ClusterMap::Set ClusterMap::getAlive() const {
- return alive;
+ClusterMap::Set ClusterMap::getAlive() const { return alive; }
+
+ClusterMap::Set ClusterMap::getMembers() const {
+ Set s;
+ std::transform(members.begin(), members.end(), std::inserter(s, s.begin()),
+ boost::bind(&Map::value_type::first, _1));
+ return s;
}
std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) {
@@ -158,7 +157,7 @@
bool ClusterMap::configChange(const std::string& addresses) {
bool memberChange = false;
- Set update;
+ Set update = decode(addresses);
for (std::string::const_iterator i = addresses.begin(); i < addresses.end(); i += 8)
update.insert(MemberId(std::string(i, i+8)));
Set removed;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Sat Apr 11 14:29:04 2009
@@ -38,26 +38,26 @@
namespace qpid {
namespace cluster {
+typedef std::set<MemberId> MemberSet;
+
/**
- * Map of established cluster members and joiners waiting for an update.
+ * Map of established cluster members and joiners waiting for an update,
+ * along with other cluster state that must be updated.
*/
class ClusterMap {
public:
typedef std::map<MemberId, Url> Map;
typedef std::set<MemberId> Set;
+ static Set decode(const std::string&);
+
ClusterMap();
ClusterMap(const MemberId& id, const Url& url, bool isReady);
- ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& states);
+ ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, uint64_t frameSeq);
/** Update from config change.
*@return true if member set changed.
*/
- bool configChange(
- cpg_address *current, int nCurrent,
- cpg_address *left, int nLeft,
- cpg_address *joined, int nJoined);
-
bool configChange(const std::string& addresses);
bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); }
@@ -78,6 +78,7 @@
std::vector<std::string> memberIds() const;
std::vector<Url> memberUrls() const;
Set getAlive() const;
+ Set getMembers() const;
bool updateRequest(const MemberId& id, const std::string& url);
/** Return non-empty Url if accepted */
@@ -90,11 +91,16 @@
* Utility method to return intersection of two member sets
*/
static Set intersection(const Set& a, const Set& b);
+
+ uint64_t getFrameSeq() { return frameSeq; }
+ uint64_t incrementFrameSeq() { return ++frameSeq; }
+
private:
Url getUrl(const Map& map, const MemberId& id);
Map joiners, members;
Set alive;
+ uint64_t frameSeq;
friend std::ostream& operator<<(std::ostream&, const Map&);
friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Sat Apr 11 14:29:04 2009
@@ -56,8 +56,16 @@
namespace cluster {
using namespace framing;
+using namespace framing::cluster;
+
+qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+
+Connection::NullFrameHandler Connection::nullFrameHandler;
+
+struct NullFrameHandler : public framing::FrameHandler {
+ void handle(framing::AMQFrame&) {}
+};
-NoOpConnectionOutputHandler Connection::discardHandler;
namespace {
sys::AtomicValue<uint64_t> idCounter;
@@ -89,6 +97,8 @@
connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
connection.setClientThrottling(false); // Disable client throttling, done by active node.
}
+ if (!isCatchUp())
+ connection.setErrorListener(this);
}
void Connection::giveReadCredit(int credit) {
@@ -97,6 +107,7 @@
}
Connection::~Connection() {
+ connection.setErrorListener(0);
QPID_LOG(debug, cluster << " deleted connection: " << *this);
}
@@ -126,7 +137,7 @@
cluster.addShadowConnection(this);
AMQFrame ok((ConnectionCloseOkBody()));
connection.getOutput().send(ok);
- output.closeOutput(discardHandler);
+ output.closeOutput();
catchUp = false;
}
else
@@ -156,8 +167,8 @@
{
if (f.type == DATA) // incoming data frames to broker::Connection
connection.received(const_cast<AMQFrame&>(f.frame));
- else { // frame control, send frame via SessionState
- broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
+ else { // frame control, send frame via SessionState
+ broker::SessionState* ss = connection.getChannel(currentChannel).getSession();
if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
}
}
@@ -180,7 +191,7 @@
// This was a local replicated connection. Multicast a deliver
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
- output.closeOutput(discardHandler);
+ output.closeOutput();
cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
}
}
@@ -275,13 +286,14 @@
QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
self = shadowId;
connection.setUserId(username);
- // OK to use decoder here because we are stalled for update.
+ // OK to use decoder here because cluster is stalled for update.
cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
+ connection.setErrorListener(this);
}
-void Connection::membership(const FieldTable& joiners, const FieldTable& members) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
- cluster.updateInDone(ClusterMap(joiners, members));
+ cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
self.second = 0; // Mark this as completed update connection.
}
@@ -305,7 +317,9 @@
}
broker::QueuedMessage Connection::getUpdateMessage() {
- broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get();
+ shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE);
+ assert(!updateq->isDurable());
+ broker::QueuedMessage m = updateq->get();
if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue"));
return m;
}
@@ -342,15 +356,15 @@
// If the message was unacked, the newbie broker must place
// it in its messageStore.
- if ( m.payload && m.payload->isPersistent() && !completed && !ended && !accepted && !cancelled )
+ if ( m.payload && m.payload->isPersistent() && acquired && !ended)
queue->enqueue ( 0, m.payload );
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
- shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
- if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
- q->setPosition(position);
-}
+ shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
+ if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
+ q->setPosition(position);
+ }
void Connection::expiryId(uint64_t id) {
cluster.getExpiryPolicy().setId(id);
@@ -407,7 +421,14 @@
QPID_LOG(debug, cluster << " decoded queue " << q->getName());
}
-qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+void Connection::sessionError(uint16_t , const std::string& ) {
+ cluster.flagError(*this, ERROR_TYPE_SESSION);
+
+}
+
+void Connection::connectionError(const std::string& ) {
+ cluster.flagError(*this, ERROR_TYPE_CONNECTION);
+}
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Sat Apr 11 14:29:04 2009
@@ -25,7 +25,6 @@
#include "types.h"
#include "WriteEstimate.h"
#include "OutputInterceptor.h"
-#include "NoOpConnectionOutputHandler.h"
#include "EventFrame.h"
#include "McastFrameHandler.h"
@@ -58,7 +57,8 @@
class Connection :
public RefCounted,
public sys::ConnectionInputHandler,
- public framing::AMQP_AllOperations::ClusterConnectionHandler
+ public framing::AMQP_AllOperations::ClusterConnectionHandler,
+ private broker::Connection::ErrorListener
{
public:
@@ -120,7 +120,7 @@
void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
- void membership(const framing::FieldTable&, const framing::FieldTable&);
+ void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
void deliveryRecord(const std::string& queue,
const framing::SequenceNumber& position,
@@ -156,6 +156,13 @@
void handle(framing::AMQFrame&) {}
};
+
+ static NullFrameHandler nullFrameHandler;
+
+ // Error listener functions
+ void connectionError(const std::string&);
+ void sessionError(uint16_t channel, const std::string&);
+
void init();
bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
@@ -167,8 +174,6 @@
broker::SemanticState& semanticState();
broker::QueuedMessage getUpdateMessage();
- static NoOpConnectionOutputHandler discardHandler;
-
Cluster& cluster;
ConnectionId self;
bool catchUp;
@@ -181,7 +186,6 @@
boost::shared_ptr<broker::TxBuffer> txBuffer;
bool expectProtocolHeader;
McastFrameHandler mcastFrameHandler;
- NullFrameHandler nullFrameHandler;
static qpid::sys::AtomicValue<uint64_t> catchUpId;
Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=764204&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Sat Apr 11 14:29:04 2009
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ErrorCheck.h"
+#include "EventFrame.h"
+#include "ClusterMap.h"
+#include "Cluster.h"
+#include "qpid/framing/ClusterErrorCheckBody.h"
+#include "qpid/framing/ClusterConfigChangeBody.h"
+#include "qpid/log/Statement.h"
+
+#include <algorithm>
+
+namespace qpid {
+namespace cluster {
+
+using namespace std;
+using namespace framing;
+using namespace framing::cluster;
+
+ErrorCheck::ErrorCheck(Cluster& c)
+ : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0)
+{}
+
+ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) {
+ copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " "));
+ return o;
+}
+
+void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms)
+{
+ // Detected a local error, inform cluster and set error state.
+ assert(t != ERROR_TYPE_NONE); // Must be an error.
+ assert(type == ERROR_TYPE_NONE); // Can only be called while processing
+ type = t;
+ unresolved = ms;
+ frameSeq = seq;
+ connection = &c;
+ QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection")
+ << " error " << frameSeq << " unresolved: " << unresolved);
+ mcast.mcastControl(ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId());
+}
+
+void ErrorCheck::delivered(const EventFrame& e) {
+ if (isUnresolved()) {
+ const ClusterErrorCheckBody* errorCheck =
+ dynamic_cast<const ClusterErrorCheckBody*>(e.frame.getMethod());
+ const ClusterConfigChangeBody* configChange =
+ dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod());
+
+ if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
+ if (errorCheck->getType() < type) { // my error is worse than his
+ QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId());
+ throw Exception("Aborted by local failure that did not occur on all replicas");
+ }
+ else { // his error is worse/same as mine.
+ QPID_LOG(critical, cluster << " Error " << frameSeq << " outcome agrees with " << e.getMemberId());
+ unresolved.erase(e.getMemberId());
+ checkResolved();
+ }
+ }
+ else {
+ frames.push_back(e); // Only drop matching errorCheck controls.
+ if (configChange) {
+ MemberSet members(ClusterMap::decode(configChange->getCurrent()));
+ MemberSet result;
+ set_intersection(members.begin(), members.end(),
+ unresolved.begin(), unresolved.end(),
+ inserter(result, result.begin()));
+ unresolved.swap(result);
+ checkResolved();
+ }
+ }
+ }
+ else
+ frames.push_back(e);
+}
+
+void ErrorCheck::checkResolved() {
+ if (unresolved.empty()) { // No more potentially conflicted members, we're clear.
+ type = ERROR_TYPE_NONE;
+ QPID_LOG(debug, cluster << " Error " << frameSeq << " resolved.");
+ }
+ else
+ QPID_LOG(debug, cluster << " Error " << frameSeq << " still unresolved: " << unresolved);
+}
+
+EventFrame ErrorCheck::getNext() {
+ assert(canProcess());
+ EventFrame e(frames.front());
+ frames.pop_front();
+ return e;
+}
+
+bool ErrorCheck::canProcess() const {
+ return type == ERROR_TYPE_NONE && !frames.empty();
+}
+
+bool ErrorCheck::isUnresolved() const {
+ return type != ERROR_TYPE_NONE;
+}
+
+}} // namespace qpid::cluster
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h?rev=764204&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h Sat Apr 11 14:29:04 2009
@@ -0,0 +1,80 @@
+#ifndef QPID_CLUSTER_ERRORCHECK_H
+#define QPID_CLUSTER_ERRORCHECK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "Multicaster.h"
+#include "qpid/framing/enum.h"
+#include <boost/function.hpp>
+#include <deque>
+#include <set>
+
+namespace qpid {
+namespace cluster {
+
+class EventFrame;
+class ClusterMap;
+class Cluster;
+class Multicaster;
+class Connection;
+
+/**
+ * Error checking logic.
+ *
+ * When an error occurs stop processing frames and queue them until we
+ * can determine if all nodes experienced the error. If not, we shut down.
+ */
+class ErrorCheck
+{
+ public:
+ typedef std::set<MemberId> MemberSet;
+ typedef framing::cluster::ErrorType ErrorType;
+
+ ErrorCheck(Cluster&);
+
+ /** A local error has occured */
+ void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&);
+
+ /** Called when a frame is delivered */
+ void delivered(const EventFrame&);
+
+ EventFrame getNext();
+
+ bool canProcess() const;
+ bool isUnresolved() const;
+
+ private:
+ void checkResolved();
+
+ Cluster& cluster;
+ Multicaster& mcast;
+ std::deque<EventFrame> frames;
+ std::set<MemberId> unresolved;
+ uint64_t frameSeq;
+ ErrorType type;
+ Connection* connection;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_ERRORCHECK_H*/
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ErrorCheck.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Sat Apr 11 14:29:04 2009
@@ -45,6 +45,7 @@
bool isCluster() const { return connectionId.getNumber() == 0; }
bool isConnection() const { return connectionId.getNumber() != 0; }
bool isLastInEvent() const { return readCredit; }
+ MemberId getMemberId() const { return connectionId.getMember(); }
ConnectionId connectionId;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/LockedConnectionMap.h Sat Apr 11 14:29:04 2009
@@ -52,6 +52,8 @@
return 0;
}
+ void clear() { sys::Mutex::ScopedLock l(lock); map.clear(); }
+
private:
typedef std::map<ConnectionId, ConnectionPtr> Map;
mutable sys::Mutex lock;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h Sat Apr 11 14:29:04 2009
@@ -30,8 +30,7 @@
namespace cluster {
/**
- * Output handler for frames sent to noop connections.
- * Simply discards frames.
+ * Output handler shadow connections, simply discards frames.
*/
class NoOpConnectionOutputHandler : public sys::ConnectionOutputHandler
{
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Sat Apr 11 14:29:04 2009
@@ -32,8 +32,9 @@
using namespace framing;
-OutputInterceptor::OutputInterceptor(
- cluster::Connection& p, sys::ConnectionOutputHandler& h)
+NoOpConnectionOutputHandler OutputInterceptor::discardHandler;
+
+OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h)
: parent(p), closing(false), next(&h), sent(),
writeEstimate(p.getCluster().getWriteEstimate()),
moreOutput(), doingOutput()
@@ -111,10 +112,10 @@
QPID_LOG(trace, parent << "Send doOutput request for " << request);
}
-void OutputInterceptor::closeOutput(sys::ConnectionOutputHandler& h) {
+void OutputInterceptor::closeOutput() {
sys::Mutex::ScopedLock l(lock);
closing = true;
- next = &h;
+ next = &discardHandler;
}
void OutputInterceptor::close() {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h Sat Apr 11 14:29:04 2009
@@ -23,6 +23,7 @@
*/
#include "WriteEstimate.h"
+#include "NoOpConnectionOutputHandler.h"
#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/broker/ConnectionFactory.h"
#include "qpid/sys/LatencyMetric.h"
@@ -53,7 +54,7 @@
// Intercept doOutput requests on Connection.
bool doOutput();
- void closeOutput(sys::ConnectionOutputHandler& h);
+ void closeOutput();
cluster::Connection& parent;
@@ -70,6 +71,7 @@
WriteEstimate writeEstimate;
bool moreOutput;
bool doingOutput;
+ static NoOpConnectionOutputHandler discardHandler;
};
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Sat Apr 11 14:29:04 2009
@@ -125,15 +125,19 @@
// Update queue is used to transfer acquired messages that are no longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
- session.close();
std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
+ session.queueDelete(arg::queue=UPDATE);
+ session.close();
+
+
ClusterConnectionProxy(session).expiryId(expiry.getId());
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
+
connection.close();
QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl);
}
@@ -202,7 +206,6 @@
sb.get()->send(transfer, message.payload->getFrames());
if (message.payload->isContentReleased()){
uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize;
-
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
Modified: qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Sat Apr 11 14:29:04 2009
@@ -114,10 +114,12 @@
SessionType session;
qpid::client::SubscriptionManager subs;
qpid::client::LocalQueue lq;
- ClientT(uint16_t port, const std::string& name=std::string())
- : connection(port), session(connection.newSession(name)), subs(session) {}
- ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name=std::string())
- : connection(settings), session(connection.newSession(name)), subs(session) {}
+ std::string name;
+
+ ClientT(uint16_t port, const std::string& name_=std::string())
+ : connection(port), session(connection.newSession(name_)), subs(session), name(name_) {}
+ ClientT(const qpid::client::ConnectionSettings& settings, const std::string& name_=std::string())
+ : connection(settings), session(connection.newSession(name_)), subs(session), name(name_) {}
~ClientT() { connection.close(); }
};
Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClusterFixture.cpp Sat Apr 11 14:29:04 2009
@@ -67,16 +67,23 @@
add(n);
}
+ClusterFixture::ClusterFixture(size_t n, int localIndex_, boost::function<void (Args&, size_t)> updateArgs_)
+ : name(Uuid(true).str()), localIndex(localIndex_), updateArgs(updateArgs_)
+{
+ add(n);
+}
+
const ClusterFixture::Args ClusterFixture::DEFAULT_ARGS =
list_of<string>("--auth=no")("--no-data-dir");
-ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix) {
+ClusterFixture::Args ClusterFixture::makeArgs(const std::string& prefix, size_t index) {
Args args = list_of<string>("qpidd " __FILE__)
("--no-module-dir")
("--load-module=../.libs/cluster.so")
("--cluster-name")(name)
("--log-prefix")(prefix);
args.insert(args.end(), userArgs.begin(), userArgs.end());
+ if (updateArgs) updateArgs(args, index);
return args;
}
@@ -84,7 +91,7 @@
if (size() != size_t(localIndex)) { // fork a broker process.
std::ostringstream os; os << "fork" << size();
std::string prefix = os.str();
- forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix))));
+ forkedBrokers.push_back(shared_ptr<ForkedBroker>(new ForkedBroker(makeArgs(prefix, size()))));
push_back(forkedBrokers.back()->getPort());
}
else { // Run in this process
@@ -106,7 +113,7 @@
assert(int(size()) == localIndex);
ostringstream os; os << "local" << localIndex;
string prefix = os.str();
- Args args(makeArgs(prefix));
+ Args args(makeArgs(prefix, localIndex));
vector<const char*> argv(args.size());
transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1));
qpid::log::Logger::instance().setPrefix(prefix);
@@ -131,3 +138,22 @@
kill(n,sig);
try { c.close(); } catch(...) {}
}
+
+/**
+ * Get the known broker ports from a Connection.
+ *@param n if specified wait for the cluster size to be n, up to a timeout.
+ */
+std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n) {
+ std::vector<qpid::Url> urls = source.getKnownBrokers();
+ if (n >= 0 && unsigned(n) != urls.size()) {
+ // Retry up to 10 secs in .1 second intervals.
+ for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
+ qpid::sys::usleep(1000*100); // 0.1 secs
+ urls = source.getKnownBrokers();
+ }
+ }
+ std::set<int> s;
+ for (std::vector<qpid::Url>::const_iterator i = urls.begin(); i != urls.end(); ++i)
+ s.insert((*i)[0].get<qpid::TcpAddress>()->port);
+ return s;
+}
Modified: qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClusterFixture.h Sat Apr 11 14:29:04 2009
@@ -38,6 +38,7 @@
#include "qpid/log/Logger.h"
#include <boost/bind.hpp>
+#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include <string>
@@ -69,33 +70,44 @@
class ClusterFixture : public vector<uint16_t> {
public:
typedef std::vector<std::string> Args;
+ static const Args DEFAULT_ARGS;
+
/** @param localIndex can be -1 meaning don't automatically start a local broker.
* A local broker can be started with addLocal().
*/
ClusterFixture(size_t n, int localIndex=0, const Args& args=DEFAULT_ARGS);
+
+ /**@param updateArgs function is passed the index of the cluster member and can update the arguments. */
+ ClusterFixture(size_t n, int localIndex, boost::function<void (Args&, size_t)> updateArgs);
+
void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
void add(); // Add a broker.
void setup();
bool hasLocal() const;
- /** Kill a forked broker with sig, or shutdown localBroker if n==0. */
+ /** Kill a forked broker with sig, or shutdown localBroker. */
void kill(size_t n, int sig=SIGINT);
/** Kill a broker and suppressing errors from closing connection c. */
void killWithSilencer(size_t n, client::Connection& c, int sig=SIGINT);
private:
- static const Args DEFAULT_ARGS;
void addLocal(); // Add a local broker.
- Args makeArgs(const std::string& prefix);
+ Args makeArgs(const std::string& prefix, size_t index);
string name;
std::auto_ptr<BrokerFixture> localBroker;
int localIndex;
std::vector<shared_ptr<ForkedBroker> > forkedBrokers;
Args userArgs;
+ boost::function<void (Args&, size_t)> updateArgs;
};
+/**
+ * Get the known broker ports from a Connection.
+ *@param n if specified wait for the cluster size to be n, up to a timeout.
+ */
+std::set<int> knownBrokerPorts(qpid::client::Connection& source, int n=-1);
#endif /*!CLUSTER_FIXTURE_H*/
Modified: qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ForkedBroker.cpp Sat Apr 11 14:29:04 2009
@@ -20,12 +20,17 @@
*/
#include "ForkedBroker.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
+#include <boost/algorithm/string.hpp>
#include <algorithm>
#include <stdlib.h>
#include <sys/types.h>
#include <signal.h>
+using namespace std;
+using qpid::ErrnoException;
+
ForkedBroker::ForkedBroker(const Args& args) { init(args); }
ForkedBroker::ForkedBroker(int argc, const char* const argv[]) { init(Args(argv, argc+argv)); }
@@ -42,14 +47,25 @@
pid = 0; // Reset pid here in case of an exception.
using qpid::ErrnoException;
if (::kill(savePid, sig) < 0)
- throw ErrnoException("kill failed");
+ throw ErrnoException("kill failed");
int status;
if (::waitpid(savePid, &status, 0) < 0 && sig != 9)
throw ErrnoException("wait for forked process failed");
if (WEXITSTATUS(status) != 0 && sig != 9)
throw qpid::Exception(QPID_MSG("Forked broker exited with: " << WEXITSTATUS(status)));
}
+
+namespace std {
+static ostream& operator<<(ostream& o, const ForkedBroker::Args& a) {
+ copy(a.begin(), a.end(), ostream_iterator<string>(o, " "));
+ return o;
+}
+bool isLogOption(const std::string& s) {
+ return boost::starts_with(s, "--log-enable") || boost::starts_with(s, "--trace");
+}
+
+}
void ForkedBroker::init(const Args& userArgs) {
using qpid::ErrnoException;
@@ -70,17 +86,19 @@
}
else { // child
::close(pipeFds[0]);
- // FIXME aconway 2009-02-12:
int fd = ::dup2(pipeFds[1], 1); // pipe stdout to the parent.
if (fd < 0) throw ErrnoException("dup2 failed");
const char* prog = "../qpidd";
Args args(userArgs);
args.push_back("--port=0");
- if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE"))
- args.push_back("--log-enable=error+"); // Keep quiet except for errors.
+ // Keep quiet except for errors.
+ if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")
+ && find_if(userArgs.begin(), userArgs.end(), isLogOption) == userArgs.end())
+ args.push_back("--log-enable=error+");
std::vector<const char*> argv(args.size());
std::transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1));
argv.push_back(0);
+ QPID_LOG(debug, "ForkedBroker exec " << prog << ": " << args);
execv(prog, const_cast<char* const*>(&argv[0]));
throw ErrnoException("execv failed");
}
Modified: qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Sat Apr 11 14:29:04 2009
@@ -53,6 +53,7 @@
~ForkedBroker();
void kill(int sig=SIGINT);
+ int wait(); // Wait for exit, return exit status.
uint16_t getPort() { return port; }
pid_t getPID() { return pid; }
Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Sat Apr 11 14:29:04 2009
@@ -110,11 +110,17 @@
# amqp_0_10/Map.cpp \
# amqp_0_10/handlers.cpp
+TESTLIBFLAGS = -module -rpath $(abs_builddir)
check_LTLIBRARIES += libshlibtest.la
-libshlibtest_la_LDFLAGS = -module -rpath $(abs_builddir)
+libshlibtest_la_LDFLAGS = $(TESTLIBFLAGS)
libshlibtest_la_SOURCES = shlibtest.cpp
+check_LTLIBRARIES += test_store.la
+test_store_la_SOURCES = test_store.cpp
+test_store_la_LIBADD = $(lib_broker) # FIXME aconway 2009-04-03: required?
+test_store_la_LDFLAGS = $(TESTLIBFLAGS)
+
include cluster.mk
if SSL
include ssl.mk
Added: qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp?rev=764204&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp Sat Apr 11 14:29:04 2009
@@ -0,0 +1,222 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**@file Tests for partial failure in a cluster.
+ * Partial failure means some nodes experience a failure while others do not.
+ * In this case the failed nodes must shut down.
+ */
+
+#include "test_tools.h"
+#include "unit_test.h"
+#include "ClusterFixture.h"
+#include <boost/assign.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/bind.hpp>
+
+QPID_AUTO_TEST_SUITE(PartialFailureTestSuite)
+
+ using namespace std;
+using namespace qpid;
+using namespace qpid::cluster;
+using namespace qpid::framing;
+using namespace qpid::client;
+using namespace qpid::client::arg;
+using namespace boost::assign;
+using broker::Broker;
+using boost::shared_ptr;
+
+// Timeout for tests that wait for messages
+const sys::Duration TIMEOUT=sys::TIME_SEC/4;
+
+static bool isLogOption(const std::string& s) { return boost::starts_with(s, "--log-enable"); }
+
+void updateArgs(ClusterFixture::Args& args, size_t index) {
+ ostringstream os;
+ os << "--test-store-name=s" << index;
+ args.push_back(os.str());
+ args.push_back("--load-module=.libs/test_store.so");
+ string dataDir("/tmp/PartialFailure.XXXXXX");
+ if (!mkdtemp(const_cast<char*>(dataDir.c_str())))
+ throw ErrnoException("Can't create data dir");
+ args.push_back("--data-dir="+dataDir);
+ args.push_back("--auth=no");
+
+ // These tests generate errors deliberately, disable error logging unless a log env var is set.
+ if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")) {
+ remove_if(args.begin(), args.end(), isLogOption);
+ args.push_back("--log-enable=critical+:DISABLED"); // hacky way to disable logs.
+ }
+}
+
+Message pMessage(string data, string q) {
+ Message msg(data, q);
+ msg.getDeliveryProperties().setDeliveryMode(PERSISTENT);
+ return msg;
+}
+
+void queueAndSub(Client& c) {
+ c.session.queueDeclare(c.name, durable=true);
+ c.subs.subscribe(c.lq, c.name);
+}
+
+// Verify normal cluster-wide errors.
+QPID_AUTO_TEST_CASE(testNormalErrors) {
+ // FIXME aconway 2009-04-10: Would like to put a scope just around
+ // the statements expected to fail (in BOOST_CHECK_THROW) but that
+ // sproadically lets out messages, possibly because they're in
+ // Connection thread.
+ ScopedSuppressLogging allQuiet;
+
+ ClusterFixture cluster(3, -1, updateArgs);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+ Client c2(cluster[2], "c2");
+
+ queueAndSub(c0);
+ c0.session.messageTransfer(content=Message("x", "c0"));
+ BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x");
+
+ // Session error.
+ BOOST_CHECK_THROW(c0.session.exchangeBind(), SessionException);
+ c1.session.messageTransfer(content=Message("stay", "c0")); // Will stay on queue, session c0 is dead.
+
+ // Connection error, kill c1 on all members.
+ queueAndSub(c1);
+ BOOST_CHECK_THROW(
+ c1.session.messageTransfer(
+ content=pMessage("TEST_STORE_DO: s0[exception] s1[exception] s2[exception] testNormalErrors", "c1")),
+ ConnectionException);
+ c2.session.messageTransfer(content=Message("stay", "c1")); // Will stay on queue, session/connection c1 is dead.
+
+ BOOST_CHECK_EQUAL(3u, knownBrokerPorts(c2.connection, 3).size());
+ BOOST_CHECK_EQUAL(c2.subs.get("c0", TIMEOUT).getData(), "stay");
+ BOOST_CHECK_EQUAL(c2.subs.get("c1", TIMEOUT).getData(), "stay");
+}
+
+
+// Test errors after a new member joins to verify frame-sequence-numbers are ok in update.
+QPID_AUTO_TEST_CASE(testErrorAfterJoin) {
+ ScopedSuppressLogging allQuiet;
+
+ ClusterFixture cluster(1, -1, updateArgs);
+ Client c0(cluster[0]);
+ c0.session.queueDeclare("q", durable=true);
+ c0.session.messageTransfer(content=pMessage("a", "q"));
+
+ // Kill the new guy
+ cluster.add();
+ Client c1(cluster[1]);
+ c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testErrorAfterJoin", "q"));
+ BOOST_CHECK_THROW(c1.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
+ BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
+
+ // Kill the old guy
+ cluster.add();
+ Client c2(cluster[2]);
+ c2.session.messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception] testErrorAfterJoin2", "q"));
+ BOOST_CHECK_THROW(c0.session.messageTransfer(content=pMessage("xxx", "q")), TransportFailure);
+
+ BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c2.connection, 1).size());
+}
+
+// Test that if one member fails and others do not, the failure leaves the cluster.
+QPID_AUTO_TEST_CASE(testSinglePartialFailure) {
+ ScopedSuppressLogging allQuiet;
+
+ ClusterFixture cluster(3, -1, updateArgs);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+ Client c2(cluster[2], "c2");
+
+ c0.session.queueDeclare("q", durable=true);
+ c0.session.messageTransfer(content=pMessage("a", "q"));
+ // Cause partial failure on c1
+ c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] testSinglePartialFailure", "q"));
+ BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
+
+ c0.session.messageTransfer(content=pMessage("b", "q"));
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 3u);
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
+
+ // Cause partial failure on c2
+ c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s2[exception] testSinglePartialFailure2", "q"));
+ BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
+
+ c0.session.messageTransfer(content=pMessage("c", "q"));
+ BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 5u);
+ BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c0.connection, 1).size());
+}
+
+// Test multiple partial falures: 2 fail 2 pass
+QPID_AUTO_TEST_CASE(testMultiPartialFailure) {
+ ScopedSuppressLogging allQuiet;
+
+ ClusterFixture cluster(4, -1, updateArgs);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+ Client c2(cluster[2], "c2");
+ Client c3(cluster[3], "c3");
+
+ c0.session.queueDeclare("q", durable=true);
+ c0.session.messageTransfer(content=pMessage("a", "q"));
+
+ // Cause partial failure on c1, c2
+ c0.session.messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception] s2[exception] testMultiPartialFailure", "q"));
+ BOOST_CHECK_THROW(c1.session.queueQuery("q"), TransportFailure);
+ BOOST_CHECK_THROW(c2.session.queueQuery("q"), TransportFailure);
+
+ c0.session.messageTransfer(content=pMessage("b", "q"));
+ c3.session.messageTransfer(content=pMessage("c", "q"));
+ BOOST_CHECK_EQUAL(c3.session.queueQuery("q").getMessageCount(), 4u);
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c0.connection, 2).size());
+}
+
+/** FIXME aconway 2009-04-10:
+ * The current approach to shutting down a process in test_store
+ * sometimes leads to assertion failures and errors in the shut-down
+ * process. Need a cleaner solution
+ */
+#if 0
+QPID_AUTO_TEST_CASE(testPartialFailureMemberLeaves) {
+ ScopedSuppressLogging allQuiet;
+
+ ClusterFixture cluster(2, -1, updateArgs);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+
+ c0.session.queueDeclare("q", durable=true);
+ c0.session.messageTransfer(content=pMessage("a", "q"));
+
+ // Cause failure on member 0 and simultaneous crash on member 1.
+ BOOST_CHECK_THROW(
+ c0.session.messageTransfer(
+ content=pMessage("TEST_STORE_DO: s0[exception] s1[exit_process] testPartialFailureMemberLeaves", "q")),
+ ConnectionException);
+ cluster.wait(1);
+
+ Client c00(cluster[0], "c00"); // Old connection is dead.
+ BOOST_CHECK_EQUAL(c00.session.queueQuery("q").getMessageCount(), 1u);
+ BOOST_CHECK_EQUAL(1u, knownBrokerPorts(c00.connection, 1).size());
+}
+#endif
+
+
+QPID_AUTO_TEST_SUITE_END()
Propchange: qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/tests/PartialFailure.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster.mk Sat Apr 11 14:29:04 2009
@@ -34,8 +34,10 @@
federated_cluster_test clustered_replication_test
check_PROGRAMS+=cluster_test
-cluster_test_SOURCES=unit_test.cpp cluster_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp
-cluster_test_LDADD=$(lib_client) ../cluster.la -lboost_unit_test_framework
+cluster_test_SOURCES=unit_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp \
+ cluster_test.cpp PartialFailure.cpp
+
+cluster_test_LDADD=$(lib_client) ../cluster.la test_store.la -lboost_unit_test_framework
unit_test_LDADD+=../cluster.la
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Sat Apr 11 14:29:04 2009
@@ -73,7 +73,7 @@
ostream& operator<<(ostream& o, const cpg_name* n) {
- return o << cluster::Cpg::str(*n);
+ return o << Cpg::str(*n);
}
ostream& operator<<(ostream& o, const cpg_address& a) {
@@ -89,29 +89,12 @@
return o;
}
-template <class C> set<uint16_t> makeSet(const C& c) {
- set<uint16_t> s;
+template <class C> set<int> makeSet(const C& c) {
+ set<int> s;
copy(c.begin(), c.end(), inserter(s, s.begin()));
return s;
}
-template <class T> set<uint16_t> knownBrokerPorts(T& source, int n=-1) {
- vector<Url> urls = source.getKnownBrokers();
- if (n >= 0 && unsigned(n) != urls.size()) {
- BOOST_MESSAGE("knownBrokerPorts waiting for " << n << ": " << urls);
- // Retry up to 10 secs in .1 second intervals.
- for (size_t retry=100; urls.size() != unsigned(n) && retry != 0; --retry) {
- sys::usleep(1000*100); // 0.1 secs
- urls = source.getKnownBrokers();
- }
- }
- BOOST_MESSAGE("knownBrokerPorts expecting " << n << ": " << urls);
- set<uint16_t> s;
- for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i)
- s.insert((*i)[0].get<TcpAddress>()->port);
- return s;
-}
-
class Sender {
public:
Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {}
@@ -175,7 +158,6 @@
QPID_AUTO_TEST_CASE(testAcl) {
ofstream policyFile("cluster_test.acl");
- // FIXME aconway 2009-02-12: guest -> qpidd?
policyFile << "acl allow foo@QPID create queue name=foo" << endl
<< "acl allow foo@QPID create queue name=foo2" << endl
<< "acl deny foo@QPID create queue name=bar" << endl
@@ -446,13 +428,13 @@
QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
- set<uint16_t> kb0 = knownBrokerPorts(c0.connection);
+ set<int> kb0 = knownBrokerPorts(c0.connection);
BOOST_CHECK_EQUAL(kb0.size(), 1u);
BOOST_CHECK_EQUAL(kb0, makeSet(cluster));
cluster.add();
Client c1(cluster[1], "c1");
- set<uint16_t> kb1 = knownBrokerPorts(c1.connection);
+ set<int> kb1 = knownBrokerPorts(c1.connection);
kb0 = knownBrokerPorts(c0.connection, 2);
BOOST_CHECK_EQUAL(kb1.size(), 2u);
BOOST_CHECK_EQUAL(kb1, makeSet(cluster));
@@ -460,7 +442,7 @@
cluster.add();
Client c2(cluster[2], "c2");
- set<uint16_t> kb2 = knownBrokerPorts(c2.connection);
+ set<int> kb2 = knownBrokerPorts(c2.connection);
kb1 = knownBrokerPorts(c1.connection, 3);
kb0 = knownBrokerPorts(c0.connection, 3);
BOOST_CHECK_EQUAL(kb2.size(), 3u);
Modified: qpid/trunk/qpid/cpp/src/tests/clustered_replication_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/clustered_replication_test?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/clustered_replication_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/clustered_replication_test Sat Apr 11 14:29:04 2009
@@ -23,6 +23,7 @@
# failures:
srcdir=`dirname $0`
PYTHON_DIR=$srcdir/../../../python
+export PYTHONPATH=$PYTHON_DIR
trap stop_brokers INT EXIT
Modified: qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp Sat Apr 11 14:29:04 2009
@@ -220,63 +220,13 @@
cout << "\n\n\n\n";
}
-
- /*
- Only call this if you already know there is at least
- one child still running. Supply a time in seconds.
- If it has been at least that long since a shild stopped
- running, we judge the system to have hung.
- */
- int
- hanging ( int hangTime )
- {
- struct timeval now,
- duration;
- gettimeofday ( &now, 0 );
-
- int how_many_hanging = 0;
-
- vector<child *>::iterator i;
- for ( i = begin(); i != end(); ++ i )
- {
- //Not in POSIX
- //timersub ( & now, &((*i)->startTime), & duration );
- duration.tv_sec = now.tv_sec - (*i)->startTime.tv_sec;
- duration.tv_usec = now.tv_usec - (*i)->startTime.tv_usec;
- if (duration.tv_usec < 0) {
- --duration.tv_sec;
- duration.tv_usec += 1000000;
- }
-
- if ( (COMPLETED != (*i)->status) // child isn't done running
- &&
- ( duration.tv_sec >= hangTime ) // it's been too long
- )
- {
- std::cerr << "Child of type "
- << (*i)->type
- << " hanging. "
- << "PID is "
- << (*i)->pid
- << endl;
- ++ how_many_hanging;
- }
- }
-
- return how_many_hanging;
- }
-
-
int verbosity;
};
-
children allMyChildren;
-
-
void
childExit ( int )
{
@@ -389,6 +339,7 @@
("--log-prefix")
(prefix.str())
("--log-to-file")
+ ("--log-enable=error+")
(prefix.str()+".log");
if (endsWith(moduleOrDir, "cluster.so")) {
@@ -818,16 +769,6 @@
return ERROR_ON_CHILD;
}
- // If one is hanging, quit.
- if ( allMyChildren.hanging ( 120 ) )
- {
- /*
- * Don't kill any processes. Leave alive for questioning.
- * */
- std::cerr << "END_OF_TEST ERROR_HANGING\n";
- return HANGING;
- }
-
if ( verbosity > 1 ) {
std::cerr << "------- next kill-broker loop --------\n";
allMyChildren.print();
Modified: qpid/trunk/qpid/cpp/src/tests/run_failover_soak
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_failover_soak?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_failover_soak (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_failover_soak Sat Apr 11 14:29:04 2009
@@ -51,5 +51,6 @@
VERBOSITY=${VERBOSITY:-1}
DURABILITY=${DURABILITY:-0}
+rm -f soak-*.log
exec ./failover_soak $MODULES ./declare_queues ./replaying_sender ./resuming_receiver $MESSAGES $REPORT_FREQUENCY $VERBOSITY $DURABILITY
Modified: qpid/trunk/qpid/cpp/src/tests/start_cluster
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/start_cluster?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/start_cluster (original)
+++ qpid/trunk/qpid/cpp/src/tests/start_cluster Sat Apr 11 14:29:04 2009
@@ -28,15 +28,17 @@
echo $* | newgrp ais
}
-rm -f cluster*.log
-SIZE=${1:-1}; shift
+rm -f cluster*.log cluster.ports qpidd.port
+
+SIZE=${1:-3}; shift
CLUSTER=`pwd` # Cluster name=pwd, avoid clashes.
-OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --no-data-dir --auth=no $@"
+OPTS="-d --no-module-dir --load-module ../.libs/cluster.so --cluster-name=$CLUSTER --auth=no $@"
for (( i=0; i<SIZE; ++i )); do
- PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS` || exit 1
+ DDIR=`mktemp -d /tmp/start_cluster.XXXXXXXXXX`
+ PORT=`with_ais_group ../qpidd -p0 --log-to-file=cluster$i.log $OPTS --data-dir=$DDIR` || exit 1
echo $PORT >> cluster.ports
done
-head cluster.ports > qpidd.port # First member's port for tests.
+head -n 1 cluster.ports > qpidd.port # First member's port for tests.
Added: qpid/trunk/qpid/cpp/src/tests/test_store.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/test_store.cpp?rev=764204&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/test_store.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/test_store.cpp Sat Apr 11 14:29:04 2009
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+/**@file
+ * Plug-in message store for tests.
+ *
+ * Add functionality as required, build up a comprehensive set of
+ * features to support persistent behavior tests.
+ *
+ * Current features special "action" messages can:
+ * - raise exception from enqueue.
+ * - force host process to exit.
+ * - do async completion after a delay.
+ */
+
+#include "qpid/broker/NullMessageStore.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Plugin.h"
+#include "qpid/Options.h"
+#include <boost/algorithm/string.hpp>
+#include <boost/cast.hpp>
+#include <boost/lexical_cast.hpp>
+
+using namespace qpid;
+using namespace broker;
+using namespace std;
+using namespace boost;
+using namespace qpid::sys;
+
+struct TestStoreOptions : public Options {
+
+ string name;
+
+ TestStoreOptions() : Options("Test Store Options") {
+ addOptions()
+ ("test-store-name", optValue(name, "NAME"), "Name to identify test store instance.");
+ }
+};
+
+struct Completer : public Runnable {
+ intrusive_ptr<PersistableMessage> message;
+ int usecs;
+ Completer(intrusive_ptr<PersistableMessage> m, int u) : message(m), usecs(u) {}
+ void run() {
+ qpid::sys::usleep(usecs);
+ message->enqueueComplete();
+ delete this;
+ }
+};
+
+class TestStore : public NullMessageStore {
+ public:
+ TestStore(const string& name_, Broker& broker_) : name(name_), broker(broker_) {}
+
+ ~TestStore() {
+ for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1));
+ }
+
+ void enqueue(TransactionContext* ,
+ const boost::intrusive_ptr<PersistableMessage>& msg,
+ const PersistableQueue& )
+ {
+ string data = polymorphic_downcast<Message*>(msg.get())->getFrames().getContent();
+
+ // Check the message for special instructions.
+ size_t i, j;
+ if (starts_with(data, TEST_STORE_DO)
+ && (i = data.find(name+"[")) != string::npos
+ && (j = data.find("]", i)) != string::npos)
+ {
+ size_t start = i+name.size()+1;
+ string action = data.substr(start, j-start);
+
+ if (action == EXCEPTION) {
+ throw Exception(QPID_MSG("TestStore " << name << " throwing exception for: " << data));
+ }
+ else if (action == EXIT_PROCESS) {
+ // FIXME aconway 2009-04-10: this is a dubious way to
+ // close the process at best, it can cause assertions or seg faults
+ // rather than clean exit.
+ QPID_LOG(critical, "TestStore " << name << " forcing process exit for: " << data);
+ exit(0);
+ }
+ else if (starts_with(action, ASYNC)) {
+ std::string delayStr(action.substr(ASYNC.size()));
+ int delay = lexical_cast<int>(delayStr);
+ threads.push_back(Thread(*new Completer(msg, delay)));
+ }
+ else {
+ QPID_LOG(error, "TestStore " << name << " unknown action " << action);
+ msg->enqueueComplete();
+ }
+ }
+ else
+ msg->enqueueComplete();
+ }
+
+ private:
+ static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC;
+ string name;
+ Broker& broker;
+ vector<Thread> threads;
+};
+
+const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: ";
+const string TestStore::EXCEPTION = "exception";
+const string TestStore::EXIT_PROCESS = "exit_process";
+const string TestStore::ASYNC="async ";
+
+struct TestStorePlugin : public Plugin {
+
+ TestStoreOptions options;
+
+ Options* getOptions() { return &options; }
+
+ void earlyInitialize (Plugin::Target& target)
+ {
+ Broker* broker = dynamic_cast<Broker*>(&target);
+ if (!broker) return;
+ broker->setStore (new TestStore(options.name, *broker));
+ }
+
+ void initialize(qpid::Plugin::Target&) {}
+};
+
+static TestStorePlugin pluginInstance;
Propchange: qpid/trunk/qpid/cpp/src/tests/test_store.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/tests/test_store.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=764204&r1=764203&r2=764204&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Sat Apr 11 14:29:04 2009
@@ -48,6 +48,19 @@
<field name="id" type="uint64"/>
</control>
+ <domain name="error-type" type="uint8" label="Types of error">
+ <enum>
+ <choice name="none" value="0"/>
+ <choice name="session" value="1"/>
+ <choice name="connection" value="2"/>
+ </enum>
+ </domain>
+
+ <control name="error-check" code="0x13">
+ <field name="type" type="error-type"/>
+ <field name="frame-seq" type="uint64"/>
+ </control>
+
<control name="shutdown" code="0x20" label="Shut down entire cluster"/>
</class>
@@ -132,6 +145,7 @@
<control name="membership" code="0x21" label="Cluster membership details.">
<field name="joiners" type="map"/> <!-- member-id -> URL -->
<field name="members" type="map"/> <!-- member-id -> state -->
+ <field name="frame-seq" type="uint64"/> <!-- frame sequence number -->
</control>
<!-- Set the position of a replicated queue. -->
@@ -146,5 +160,6 @@
<!-- Set expiry-id for subsequent messages. -->
<control name="expiry-id" code="0x33"><field name="expiry-id" type="uint64"/></control>
+
</class>
</amqp>
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org