You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/05 13:29:14 UTC
svn commit: r821761 [2/6] - in /qpid/branches/java-broker-0-10/qpid: ./ cpp/
cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/python/qmf/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/boost-1.32-support/
cpp/build-aux/ cpp/examples/qmf-...
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Oct 5 11:29:03 2009
@@ -429,13 +429,11 @@
}
}
-
SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
HandlerHelper(s),
releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)),
releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
- rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
- acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
+ rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2))
{}
//
@@ -547,8 +545,7 @@
void SessionAdapter::MessageHandlerImpl::accept(const framing::SequenceSet& commands)
{
-
- commands.for_each(acceptOp);
+ state.accepted(commands);
}
framing::MessageAcquireResult SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& transfers)
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Oct 5 11:29:03 2009
@@ -357,8 +357,7 @@
void SessionState::senderCompleted(const SequenceSet& commands) {
qpid::SessionState::senderCompleted(commands);
- for (SequenceSet::RangeIterator i = commands.rangesBegin(); i != commands.rangesEnd(); i++)
- semanticState.completed(i->first(), i->last());
+ semanticState.completed(commands);
}
void SessionState::readyToSend() {
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/TopicExchange.cpp Mon Oct 5 11:29:03 2009
@@ -225,12 +225,25 @@
if (reallyUnbind)
unbind(queue, routingPattern, 0);
} else if (fedOp == fedOpReorigin) {
- for (BindingMap::iterator iter = bindings.begin();
- iter != bindings.end(); iter++) {
- const BoundKey& bk = iter->second;
- if (bk.fedBinding.hasLocal()) {
- propagateFedOp(iter->first, string(), fedOpBind, string());
+ /** gather up all the keys that need rebinding in a local vector
+ * while holding the lock. Then propagate once the lock is
+ * released
+ */
+ std::vector<std::string> keys2prop;
+ {
+ RWlock::ScopedRlock l(lock);
+ for (BindingMap::iterator iter = bindings.begin();
+ iter != bindings.end(); iter++) {
+ const BoundKey& bk = iter->second;
+
+ if (bk.fedBinding.hasLocal()) {
+ keys2prop.push_back(iter->first);
+ }
}
+ } /* lock dropped */
+ for (std::vector<std::string>::const_iterator key = keys2prop.begin();
+ key != keys2prop.end(); key++) {
+ propagateFedOp( *key, string(), fedOpBind, string());
}
}
Propchange: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon Oct 5 11:29:03 2009
@@ -1,5 +1,7 @@
+no_keyword
.deps
.libs
Makefile
Makefile.in
.dirstamp
+
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Mon Oct 5 11:29:03 2009
@@ -147,11 +147,15 @@
fail("Connection closed before it was established");
break;
case OPEN:
- setState(CLOSING);
- proxy.close(200, OK);
- waitFor(FINISHED);
+ if (setState(CLOSING, OPEN)) {
+ proxy.close(200, OK);
+ waitFor(FINISHED);//FINISHED = CLOSED or FAILED
+ }
+ //else, state was changed from open after we checked, can only
+ //change to failed or closed, so nothing to do
break;
- // Nothing to do for CLOSING, CLOSED, FAILED or NOT_STARTED
+
+ // Nothing to do if already CLOSING, CLOSED, FAILED or if NOT_STARTED
}
}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/StateManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/StateManager.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/StateManager.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/StateManager.cpp Mon Oct 5 11:29:03 2009
@@ -60,6 +60,18 @@
stateLock.notifyAll();
}
+bool StateManager::setState(int s, int expected)
+{
+ Monitor::ScopedLock l(stateLock);
+ if (state == expected) {
+ state = s;
+ stateLock.notifyAll();
+ return true;
+ } else {
+ return false;
+ }
+}
+
int StateManager::getState() const
{
Monitor::ScopedLock l(stateLock);
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/StateManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/StateManager.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/StateManager.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/StateManager.h Mon Oct 5 11:29:03 2009
@@ -36,6 +36,7 @@
public:
StateManager(int initial);
void setState(int state);
+ bool setState(int state, int expected);
int getState() const ;
void waitForStateChange(int current);
void waitFor(std::set<int> states);
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Oct 5 11:29:03 2009
@@ -190,6 +190,7 @@
boost::bind(&Cluster::leave, this),
"Error delivering frames",
poller),
+ quorum(boost::bind(&Cluster::leave, this)),
initialized(false),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
@@ -214,7 +215,6 @@
// Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange.
broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
- if (settings.quorum) quorum.init();
cpg.join(name);
// pump the CPG dispatch manually till we get initialized.
while (!initialized)
@@ -226,10 +226,10 @@
}
void Cluster::initialize() {
+ if (settings.quorum) quorum.start(poller);
if (myUrl.empty())
myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
- QPID_LOG(notice, *this << " member " << self << " joining "
- << name << " with url=" << myUrl);
+ QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
@@ -315,7 +315,7 @@
// Deliver CPG message.
void Cluster::deliver(
cpg_handle_t /*handle*/,
- cpg_name* /*group*/,
+ const cpg_name* /*group*/,
uint32_t nodeid,
uint32_t pid,
void* msg,
@@ -323,9 +323,11 @@
{
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
- Event e(Event::decodeCopy(from, buf));
- LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish());
- deliverEvent(e);
+ while (buf.available()) {
+ Event e(Event::decodeCopy(from, buf));
+ LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish());
+ deliverEvent(e);
+ }
}
LATENCY_TRACK(sys::LatencyTracker<const char*> eventQueueLatencyTracker("EventQueue");)
@@ -404,6 +406,7 @@
LATENCY_TRACK(frameQueueLatencyTracker.finish(e.frame.getBody()));
LATENCY_TRACK(if (e.frame.getBody()->type() == CONTENT_BODY) doOutputTracker.start(e.frame.getBody()));
Mutex::ScopedLock l(lock);
+ if (state == LEFT) return;
EventFrame e(efConst);
const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody());
if (offer && error.isUnresolved()) {
@@ -507,10 +510,10 @@
void Cluster::configChange (
cpg_handle_t /*handle*/,
- cpg_name */*group*/,
- cpg_address *current, int nCurrent,
- cpg_address *left, int nLeft,
- cpg_address */*joined*/, int /*nJoined*/)
+ const cpg_name */*group*/,
+ const cpg_address *current, int nCurrent,
+ const cpg_address *left, int nLeft,
+ const cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
if (state == INIT) { // First config change.
@@ -518,10 +521,13 @@
broker.setRecovery(nCurrent == 1);
initialized = true;
}
- QPID_LOG(notice, *this << " membership change: " << AddrList(current, nCurrent)
- << AddrList(left, nLeft, "left: "));
+ QPID_LOG(notice, *this << " membership change: "
+ << AddrList(current, nCurrent) << "("
+ << AddrList(joined, nJoined, "joined: ")
+ << AddrList(left, nLeft, "left: ")
+ << ")");
std::string addresses;
- for (cpg_address* p = current; p < current+nCurrent; ++p)
+ for (const cpg_address* p = current; p < current+nCurrent; ++p)
addresses.append(MemberId(*p).str());
deliverEvent(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
}
@@ -833,9 +839,9 @@
"INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
};
assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
- o << "cluster:" << STATE[cluster.state];
+ o << "cluster(" << cluster.self << " " << STATE[cluster.state];
if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << "/error";
- return o;
+ return o << ")";;
}
MemberId Cluster::getId() const {
@@ -846,14 +852,6 @@
return broker; // Immutable, no need to lock.
}
-void Cluster::checkQuorum() {
- if (!quorum.isQuorate()) {
- QPID_LOG(critical, *this << " disconnected from cluster quorum, shutting down");
- leave();
- throw Exception(QPID_MSG(*this << " disconnected from cluster quorum."));
- }
-}
-
void Cluster::setClusterId(const Uuid& uuid, Lock&) {
clusterId = uuid;
if (mgmtObject) {
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h Mon Oct 5 11:29:03 2009
@@ -101,8 +101,6 @@
broker::Broker& getBroker() const;
Multicaster& getMulticast() { return mcast; }
- void checkQuorum();
-
const ClusterSettings& getSettings() const { return settings; }
void deliverFrame(const EventFrame&);
@@ -169,7 +167,7 @@
// == Called in CPG dispatch thread
void deliver( // CPG deliver callback.
cpg_handle_t /*handle*/,
- struct cpg_name *group,
+ const struct cpg_name *group,
uint32_t /*nodeid*/,
uint32_t /*pid*/,
void* /*msg*/,
@@ -179,10 +177,10 @@
void configChange( // CPG config change callback.
cpg_handle_t /*handle*/,
- struct cpg_name */*group*/,
- struct cpg_address */*members*/, int /*nMembers*/,
- struct cpg_address */*left*/, int /*nLeft*/,
- struct cpg_address */*joined*/, int /*nJoined*/
+ const struct cpg_name */*group*/,
+ const struct cpg_address */*members*/, int /*nMembers*/,
+ const struct cpg_address */*left*/, int /*nLeft*/,
+ const struct cpg_address */*joined*/, int /*nJoined*/
);
// == Called in management threads.
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Mon Oct 5 11:29:03 2009
@@ -72,7 +72,7 @@
("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.")
- // FIXME aconway 2009-05-20: temporary
+ // TODO aconway 2009-05-20: temporary, remove
("cluster-check-errors", optValue(settings.checkErrors, "yes|no"), "Enable/disable cluster error checks. Normally should be enabled.")
;
}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterSettings.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterSettings.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterSettings.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterSettings.h Mon Oct 5 11:29:03 2009
@@ -37,7 +37,7 @@
bool checkErrors;
ClusterSettings() : quorum(false), readMax(10),
- checkErrors(true) // FIXME aconway 2009-05-20: temporary
+ checkErrors(true) // TODO aconway 2009-05-20: remove this option.
{}
Url getUrl(uint16_t port) const {
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cpg.cpp Mon Oct 5 11:29:03 2009
@@ -49,6 +49,28 @@
// Global callback functions.
void Cpg::globalDeliver (
cpg_handle_t handle,
+ const struct cpg_name *group,
+ uint32_t nodeid,
+ uint32_t pid,
+ void* msg,
+ size_t msg_len)
+{
+ cpgFromHandle(handle)->handler.deliver(handle, group, nodeid, pid, msg, msg_len);
+}
+
+void Cpg::globalConfigChange(
+ cpg_handle_t handle,
+ const struct cpg_name *group,
+ const struct cpg_address *members, size_t nMembers,
+ const struct cpg_address *left, size_t nLeft,
+ const struct cpg_address *joined, size_t nJoined
+)
+{
+ cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
+}
+
+void Cpg::globalDeliver (
+ cpg_handle_t handle,
struct cpg_name *group,
uint32_t nodeid,
uint32_t pid,
@@ -83,7 +105,7 @@
QPID_LOG(info, "Initializing CPG");
cpg_error_t err = cpg_initialize(&handle, &callbacks);
- int retries = 6;
+ int retries = 6; // FIXME aconway 2009-08-06: configure, use same config for cman connection.
while (err == CPG_ERR_TRY_AGAIN && --retries) {
QPID_LOG(notice, "Re-trying CPG initialization.");
sys::sleep(5);
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cpg.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cpg.h Mon Oct 5 11:29:03 2009
@@ -20,7 +20,6 @@
*/
#include "qpid/Exception.h"
-#include "qpid/cluster/Dispatchable.h"
#include "qpid/cluster/types.h"
#include "qpid/sys/IOHandle.h"
#include "qpid/sys/Mutex.h"
@@ -68,7 +67,7 @@
virtual ~Handler() {};
virtual void deliver(
cpg_handle_t /*handle*/,
- struct cpg_name *group,
+ const struct cpg_name *group,
uint32_t /*nodeid*/,
uint32_t /*pid*/,
void* /*msg*/,
@@ -76,10 +75,10 @@
virtual void configChange(
cpg_handle_t /*handle*/,
- struct cpg_name */*group*/,
- struct cpg_address */*members*/, int /*nMembers*/,
- struct cpg_address */*left*/, int /*nLeft*/,
- struct cpg_address */*joined*/, int /*nJoined*/
+ const struct cpg_name */*group*/,
+ const struct cpg_address */*members*/, int /*nMembers*/,
+ const struct cpg_address */*left*/, int /*nLeft*/,
+ const struct cpg_address */*joined*/, int /*nJoined*/
) = 0;
};
@@ -122,6 +121,24 @@
static Cpg* cpgFromHandle(cpg_handle_t);
+ // New versions for corosync 1.0 and higher
+ static void globalDeliver(
+ cpg_handle_t handle,
+ const struct cpg_name *group,
+ uint32_t nodeid,
+ uint32_t pid,
+ void* msg,
+ size_t msg_len);
+
+ static void globalConfigChange(
+ cpg_handle_t handle,
+ const struct cpg_name *group,
+ const struct cpg_address *members, size_t nMembers,
+ const struct cpg_address *left, size_t nLeft,
+ const struct cpg_address *joined, size_t nJoined
+ );
+
+ // Old versions for openais
static void globalDeliver(
cpg_handle_t handle,
struct cpg_name *group,
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Event.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Event.cpp Mon Oct 5 11:29:03 2009
@@ -72,7 +72,7 @@
if (buf.available() < e.size)
throw Exception("Not enough data for multicast event");
e.store = RefCountedBuffer::create(e.size + HEADER_SIZE);
- memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size);
+ buf.getRawData((uint8_t*)(e.getData()), e.size);
return e;
}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Multicaster.cpp Mon Oct 5 11:29:03 2009
@@ -24,10 +24,14 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/AMQBody.h"
#include "qpid/framing/AMQFrame.h"
+#include <boost/bind.hpp>
+#include <algorithm>
namespace qpid {
namespace cluster {
+static const int MCAST_IOV_MAX=63; // Limit imposed by CPG
+
Multicaster::Multicaster(Cpg& cpg_,
const boost::shared_ptr<sys::Poller>& poller,
boost::function<void()> onError_) :
@@ -36,7 +40,8 @@
#endif
onError(onError_), cpg(cpg_),
queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
- holding(true)
+ holding(true),
+ ioVector(MCAST_IOV_MAX)
{
queue.start();
}
@@ -70,26 +75,29 @@
queue.push(e);
}
-
-Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) {
+Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(
+ const PollableEventQueue::Batch& events)
+{
+ PollableEventQueue::Batch::const_iterator i = events.begin();
try {
- PollableEventQueue::Batch::const_iterator i = values.begin();
- while( i != values.end()) {
- iovec iov = i->toIovec();
- if (!cpg.mcast(&iov, 1)) {
- // cpg didn't send because of CPG flow control.
- break;
+ while (i < events.end()) {
+ size_t count = std::min(MCAST_IOV_MAX, int(events.end() - i));
+ std::transform(i, i+count, ioVector.begin(),
+ boost::bind(&Event::toIovec, _1));
+ if (!cpg.mcast(&ioVector.front(), count)) {
+ QPID_LOG(trace, "CPG flow control, will resend "
+ << events.end() - i << " events");
+ break;
}
- ++i;
+ i += count;
}
- return i;
}
catch (const std::exception& e) {
QPID_LOG(critical, "Multicast error: " << e.what());
queue.stop();
onError();
- return values.end();
}
+ return i;
}
void Multicaster::release() {
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Multicaster.h Mon Oct 5 11:29:03 2009
@@ -29,6 +29,7 @@
#include "qpid/sys/LatencyTracker.h"
#include <boost/shared_ptr.hpp>
#include <deque>
+#include <vector>
namespace qpid {
@@ -72,7 +73,7 @@
PollableEventQueue queue;
bool holding;
PlainEventQueue holdingQueue;
- std::vector<struct ::iovec> ioVector;
+ std::vector< ::iovec> ioVector;
};
}} // namespace qpid::cluster
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Mon Oct 5 11:29:03 2009
@@ -46,7 +46,6 @@
void OutputInterceptor::send(framing::AMQFrame& f) {
LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
- parent.getCluster().checkQuorum();
{
sys::Mutex::ScopedLock l(lock);
next->send(f);
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp Mon Oct 5 11:29:03 2009
@@ -33,15 +33,18 @@
boost::bind(&PollerDispatch::dispatch, this, _1), // read
0, // write
boost::bind(&PollerDispatch::disconnect, this, _1) // disconnect
- )
+ ),
+ started(false)
{}
PollerDispatch::~PollerDispatch() {
- dispatchHandle.stopWatch();
+ if (started)
+ dispatchHandle.stopWatch();
}
void PollerDispatch::start() {
dispatchHandle.startWatch(poller);
+ started = true;
}
// Entry point: called by IO to dispatch CPG events.
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/PollerDispatch.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/PollerDispatch.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/PollerDispatch.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/PollerDispatch.h Mon Oct 5 11:29:03 2009
@@ -51,6 +51,7 @@
boost::shared_ptr<sys::Poller> poller;
boost::function<void()> onError;
sys::DispatchHandleRef dispatchHandle;
+ bool started;
};
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp Mon Oct 5 11:29:03 2009
@@ -18,28 +18,86 @@
* under the License.
*
*/
+
#include "qpid/cluster/Quorum_cman.h"
+#include "qpid/cluster/Cluster.h"
#include "qpid/log/Statement.h"
#include "qpid/Options.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/posix/PrivatePosix.h"
namespace qpid {
namespace cluster {
-Quorum::Quorum() : enable(false), cman(0) {}
+namespace {
+
+boost::function<void()> errorFn;
-Quorum::~Quorum() { if (cman) cman_finish(cman); }
+void cmanCallbackFn(cman_handle_t handle, void */*privdata*/, int reason, int arg) {
+ if (reason == CMAN_REASON_STATECHANGE && arg == 0) {
+ QPID_LOG(critical, "Lost contact with cluster quorum.");
+ if (errorFn) errorFn();
+ cman_stop_notification(handle);
+ }
+}
+}
+
+Quorum::Quorum(boost::function<void()> err) : enable(false), cman(0), cmanFd(0) {
+ errorFn = err;
+}
+
+Quorum::~Quorum() {
+ dispatchHandle.reset();
+ if (cman) cman_finish(cman);
+}
-void Quorum::init() {
+void Quorum::start(boost::shared_ptr<sys::Poller> p) {
+ poller = p;
enable = true;
+ QPID_LOG(debug, "Connecting to quorum service.");
cman = cman_init(0);
if (cman == 0) throw ErrnoException("Can't connect to cman service");
if (!cman_is_quorate(cman)) {
QPID_LOG(notice, "Waiting for cluster quorum.");
while(!cman_is_quorate(cman)) sys::sleep(5);
}
+ int err = cman_start_notification(cman, cmanCallbackFn);
+ if (err != 0) throw ErrnoException("Can't register for cman notifications");
+ watch(getFd());
}
-bool Quorum::isQuorate() { return enable ? cman_is_quorate(cman) : true; }
+void Quorum::watch(int fd) {
+ cmanFd = fd;
+ dispatchHandle.reset(
+ new sys::DispatchHandleRef(
+ sys::PosixIOHandle(cmanFd),
+ boost::bind(&Quorum::dispatch, this, _1), // read
+ 0, // write
+ boost::bind(&Quorum::disconnect, this, _1) // disconnect
+ ));
+ dispatchHandle->startWatch(poller);
+}
+
+int Quorum::getFd() {
+ int fd = cman_get_fd(cman);
+ if (fd == 0) throw ErrnoException("Can't get cman file descriptor");
+ return fd;
+}
+
+void Quorum::dispatch(sys::DispatchHandle&) {
+ try {
+ cman_dispatch(cman, CMAN_DISPATCH_ALL);
+ int fd = getFd();
+ if (fd != cmanFd) watch(fd);
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, "Error in quorum dispatch: " << e.what());
+ errorFn();
+ }
+}
+
+void Quorum::disconnect(sys::DispatchHandle&) {
+ QPID_LOG(critical, "Disconnected from quorum service");
+ errorFn();
+}
}} // namespace qpid::cluster
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_cman.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_cman.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_cman.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_cman.h Mon Oct 5 11:29:03 2009
@@ -22,26 +22,40 @@
*
*/
+#include <qpid/sys/DispatchHandle.h>
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+#include <memory>
+
extern "C" {
#include <libcman.h>
}
namespace qpid {
-
-class Options;
+namespace sys {
+class Poller;
+}
namespace cluster {
+class Cluster;
class Quorum {
public:
- Quorum();
+ Quorum(boost::function<void ()> onError);
~Quorum();
- void init();
- bool isQuorate();
+ void start(boost::shared_ptr<sys::Poller>);
private:
+ void dispatch(sys::DispatchHandle&);
+ void disconnect(sys::DispatchHandle&);
+ int getFd();
+ void watch(int fd);
+
bool enable;
cman_handle_t cman;
+ int cmanFd;
+ std::auto_ptr<sys::DispatchHandleRef> dispatchHandle;
+ boost::shared_ptr<sys::Poller> poller;
};
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_null.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_null.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_null.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Quorum_null.h Mon Oct 5 11:29:03 2009
@@ -21,15 +21,20 @@
* under the License.
*
*/
+
+#include <boost/shared_ptr.hpp>
+#include <boost/function.hpp>
+
namespace qpid {
namespace cluster {
+class Cluster;
/** Null implementation of quorum. */
class Quorum {
public:
- void init() {}
- bool isQuorate() { return true; }
+ Quorum(boost::function<void ()>) {}
+ void start(boost::shared_ptr<sys::Poller>) {}
};
#endif /*!QPID_CLUSTER_QUORUM_NULL_H*/
Propchange: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon Oct 5 11:29:03 2009
@@ -1,3 +1,157 @@
+DtxSetTimeoutBody.cpp
+AllInvoker.cpp
+XaResult.cpp
+ClusterConnectionTxDequeueBody.cpp
+ClusterUpdateRequestBody.cpp
+SessionKnownCompletedBody.cpp
+MessageAcquireBody.cpp
+ClientInvoker.cpp
+ExchangeDeclareBody.cpp
+ClusterConnectionOutputTaskBody.cpp
+ClusterConnectionSessionStateBody.cpp
+ClusterUpdateOfferBody.cpp
+ConnectionStartOkBody.cpp
+ClusterShutdownBody.cpp
+ConnectionTuneOkBody.cpp
+AMQP_ClientProxy.h
+ServerInvoker.cpp
+MessageSubscribeBody.cpp
+TxSelectBody.cpp
+QueueQueryResult.cpp
+FileQosOkBody.cpp
+ExecutionSyncBody.cpp
+ClusterConnectionDeliverCloseBody.cpp
+Xid.cpp
+ClusterMessageExpiredBody.cpp
+AMQP_ServerProxy.h
+ClusterConnectionMembershipBody.cpp
+StreamQosBody.cpp
+ConnectionOpenBody.cpp
+DtxGetTimeoutResult.cpp
+ExchangeQueryResult.cpp
+FileOpenBody.cpp
+MessageResumeBody.cpp
+DtxRecoverBody.cpp
+ClusterConnectionAnnounceBody.cpp
+MessageCancelBody.cpp
+ExchangeBoundResult.cpp
+ClusterConnectionTxEnqueueBody.cpp
+DtxGetTimeoutBody.cpp
+MethodBodyFactory.cpp
+SessionGapBody.cpp
+ConnectionHeartbeatBody.cpp
+SessionCompletedBody.cpp
+ConnectionRedirectBody.cpp
+ConnectionCloseBody.cpp
+MessageTransferBody.cpp
+ConnectionTuneBody.cpp
+DtxSelectBody.cpp
+ExecutionResultBody.cpp
+FileStageBody.cpp
+QueueDeleteBody.cpp
+StreamQosOkBody.cpp
+ClusterConnectionRetractOfferBody.cpp
+SessionConfirmedBody.cpp
+MessageFlowBody.cpp
+ConnectionCloseOkBody.cpp
+AMQP_ServerOperations.h
+ExecutionExceptionBody.cpp
+MessageReleaseBody.cpp
+ClusterRetractOfferBody.cpp
+FragmentProperties.cpp
+SessionRequestTimeoutBody.cpp
+ClusterReadyBody.cpp
+DtxRollbackBody.cpp
+DtxEndBody.cpp
+SessionTimeoutBody.cpp
+FileQosBody.cpp
+SessionExpectedBody.cpp
+ConnectionSecureBody.cpp
+ClusterConfigChangeBody.cpp
+ExchangeBindBody.cpp
+ClusterConnectionAddQueueListenerBody.cpp
+ClusterConnectionTxStartBody.cpp
+ClusterConnectionTxEndBody.cpp
+ExchangeDeleteBody.cpp
+FileRejectBody.cpp
+ClusterConnectionExpiryIdBody.cpp
+AMQP_ClientProxy.cpp
+ClusterConnectionDeliverDoOutputBody.cpp
+StreamPublishBody.cpp
+DeliveryProperties.cpp
+FileConsumeOkBody.cpp
+SessionDetachedBody.cpp
+SessionAttachBody.cpp
+AMQP_ServerProxy.cpp
+StreamProperties.cpp
+AllInvoker.h
+MessageSetFlowModeBody.cpp
+TypeCode.cpp
+FileAckBody.cpp
+ConnectionSecureOkBody.cpp
+AMQP_AllProxy.cpp
+MethodBodyDefaultVisitor.h
+DtxStartBody.cpp
+reply_exceptions.cpp
+FileProperties.cpp
+ClusterConnectionAccumulatedAckBody.cpp
+ClusterConnectionAbortBody.cpp
+ClusterErrorCheckBody.cpp
+TxCommitBody.cpp
+TxRollbackBody.cpp
+MethodBodyDefaultVisitor.cpp
+StreamDeliverBody.cpp
+ClusterConnectionQueueBody.cpp
+QueueQueryBody.cpp
+ConnectionOpenOkBody.cpp
+StreamConsumeBody.cpp
+FileOpenOkBody.cpp
+ClusterConnectionShadowReadyBody.cpp
+SessionDetachBody.cpp
+DtxForgetBody.cpp
+frame_body_lists.h
+ClusterConnectionConsumerStateBody.cpp
+StreamConsumeOkBody.cpp
+AMQP_AllOperations.h
+AMQP_ClientOperations.h
+MessageStopBody.cpp
+FilePublishBody.cpp
+ExchangeBoundBody.cpp
+ReplyTo.cpp
+MessageRejectBody.cpp
+ExchangeUnbindBody.cpp
+SessionFlushBody.cpp
+MessageFlushBody.cpp
+QueuePurgeBody.cpp
+StreamReturnBody.cpp
+StreamCancelBody.cpp
+ClientInvoker.h
+DtxCommitBody.cpp
+Header.cpp
+MethodBodyConstVisitor.h
+DtxPrepareBody.cpp
+FileReturnBody.cpp
+SessionAttachedBody.cpp
+FileCancelBody.cpp
+ServerInvoker.h
+SessionCommandPointBody.cpp
+ClusterConnectionDeliveryRecordBody.cpp
+MessageAcquireResult.cpp
+AMQP_AllProxy.h
+MessageProperties.cpp
+all_method_bodies.h
+MessageResumeResult.cpp
+FileDeliverBody.cpp
+ClusterConnectionQueuePositionBody.cpp
+ClusterConnectionTxAcceptBody.cpp
+ClusterConnectionExchangeBody.cpp
+QueueDeclareBody.cpp
+MessageAcceptBody.cpp
+ClusterConnectionTxPublishBody.cpp
+DtxRecoverResult.cpp
+ExchangeQueryBody.cpp
+FileConsumeBody.cpp
+ConnectionStartBody.cpp
.deps
.libs
.dirstamp
@@ -6,3 +160,4 @@
method_variants.h
MethodHolderMaxSize.h
MaxMethodBodySize.h
+
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.cpp Mon Oct 5 11:29:03 2009
@@ -41,7 +41,7 @@
AMQFrame::AMQFrame(const AMQBody& b) : body(b.clone()) { init(); }
-AMQFrame::~AMQFrame() { init(); }
+AMQFrame::~AMQFrame() {}
AMQBody* AMQFrame::getBody() {
// Non-const AMQBody* may be used to modify the body.
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/DispatchHandle.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/DispatchHandle.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/DispatchHandle.cpp Mon Oct 5 11:29:03 2009
@@ -284,10 +284,7 @@
readableCallback(*this);
writableCallback(*this);
break;
- case Poller::DISCONNECTED: {
- ScopedLock<Mutex> lock(stateLock);
- poller->unmonitorHandle(*this, Poller::INOUT);
- }
+ case Poller::DISCONNECTED:
if (disconnectedCallback) {
disconnectedCallback(*this);
}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp Mon Oct 5 11:29:03 2009
@@ -55,9 +55,10 @@
fire();
}
+// This can only be used to setup the next fire time. After the Timer has already fired
void TimerTask::setupNextFire() {
if (period && readyToFire()) {
- nextFireTime = AbsTime(nextFireTime, period);
+ nextFireTime = max(AbsTime::now(), AbsTime(nextFireTime, period));
cancelled = false;
} else {
QPID_LOG(error, "Couldn't setup next timer firing: " << Duration(nextFireTime, AbsTime::now()) << "[" << period << "]");
@@ -66,13 +67,11 @@
// Only allow tasks to be delayed
void TimerTask::restart() { nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period)); }
-void TimerTask::delayTill(AbsTime time) { period = 0; nextFireTime = max(nextFireTime, time); }
void TimerTask::cancel() {
ScopedLock<Mutex> l(callbackLock);
cancelled = true;
}
-bool TimerTask::isCancelled() const { return cancelled; }
Timer::Timer() :
active(false)
@@ -98,13 +97,13 @@
// warn on extreme lateness
AbsTime start(AbsTime::now());
- Duration late(t->sortTime, start);
- if (late > 500 * TIME_MSEC) {
- QPID_LOG(warning, "Timer delayed by " << late / TIME_MSEC << "ms");
- }
+ Duration delay(t->sortTime, start);
{
ScopedLock<Mutex> l(t->callbackLock);
if (t->cancelled) {
+ if (delay > 500 * TIME_MSEC) {
+ QPID_LOG(debug, "cancelled Timer woken up " << delay / TIME_MSEC << "ms late");
+ }
continue;
} else if(Duration(t->nextFireTime, start) >= 0) {
Monitor::ScopedUnlock u(monitor);
@@ -112,7 +111,17 @@
// Warn on callback overrun
AbsTime end(AbsTime::now());
Duration overrun(tasks.top()->nextFireTime, end);
- if (overrun > 1 * TIME_MSEC) {
+ bool late = delay > 1 * TIME_MSEC;
+ bool overran = overrun > 1 * TIME_MSEC;
+ if (late)
+ if (overran) {
+ QPID_LOG(warning,
+ "Timer woken up " << delay / TIME_MSEC << "ms late, "
+ "overrunning by " << overrun / TIME_MSEC << "ms [taking "
+ << Duration(start, end) << "]");
+ } else {
+ QPID_LOG(warning, "Timer woken up " << delay / TIME_MSEC << "ms late");
+ } else if (overran) {
QPID_LOG(warning,
"Timer callback overran by " << overrun / TIME_MSEC << "ms [taking "
<< Duration(start, end) << "]");
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.h?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.h Mon Oct 5 11:29:03 2009
@@ -58,9 +58,7 @@
QPID_COMMON_EXTERN void setupNextFire();
QPID_COMMON_EXTERN void restart();
- QPID_COMMON_EXTERN void delayTill(AbsTime fireTime);
QPID_COMMON_EXTERN void cancel();
- QPID_COMMON_EXTERN bool isCancelled() const;
protected:
// Must be overridden with callback
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon Oct 5 11:29:03 2009
@@ -575,6 +575,11 @@
// (just not writable), allow us to readable until we get here again
if (epe.events & ::EPOLLHUP) {
if (eh.isHungup()) {
+ eh.setInactive();
+ // Don't set up last Handle so that we don't reset this handle
+ // on re-entering Poller::wait. This means that we will never
+ // be set active again once we've returned disconnected, and so
+ // can never be returned again.
return Event(handle, DISCONNECTED);
}
eh.setHungup();
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/windows/LockFile.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/windows/LockFile.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/windows/LockFile.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/windows/LockFile.cpp Mon Oct 5 11:29:03 2009
@@ -43,7 +43,8 @@
create ? OPEN_ALWAYS : OPEN_EXISTING,
FILE_FLAG_DELETE_ON_CLOSE, /* Delete file when closed */
NULL);
- QPID_WINDOWS_CHECK_NOT(h, INVALID_HANDLE_VALUE);
+ if (h == INVALID_HANDLE_VALUE)
+ throw qpid::Exception(path + qpid::sys::strError(GetLastError()));
impl.reset(new LockFilePrivate(h));
}
Propchange: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon Oct 5 11:29:03 2009
@@ -1,3 +1,4 @@
+allSegmentTypes.h
Makefile.in
Makefile
@@ -41,3 +42,4 @@
qpid_ping
datagen
+
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am Mon Oct 5 11:29:03 2009
@@ -256,6 +256,18 @@
datagen_SOURCES=datagen.cpp
datagen_LDADD=$(lib_common)
+check_PROGRAMS+=qrsh_server
+qrsh_server_SOURCES=qrsh_server.cpp
+qrsh_server_LDADD=$(lib_client)
+
+check_PROGRAMS+=qrsh_run
+qrsh_run_SOURCES=qrsh_run.cpp
+qrsh_run_LDADD=$(lib_client)
+
+check_PROGRAMS+=qrsh
+qrsh_SOURCES=qrsh.cpp
+qrsh_LDADD=$(lib_client)
+
TESTS_ENVIRONMENT = \
VALGRIND=$(VALGRIND) \
@@ -305,9 +317,8 @@
# Not run under valgrind, too slow
LONG_TESTS+=start_broker fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test stop_broker \
- run_failover_soak \
+ run_failover_soak reliable_replication_test \
federated_cluster_test_with_node_failure
-# TODO: renable the temporarily disabled the failing reliable_replication_test when QPID-1984 is resolved.
EXTRA_DIST+=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak reliable_replication_test \
federated_cluster_test_with_node_failure \
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PollerTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PollerTest.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PollerTest.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PollerTest.cpp Mon Oct 5 11:29:03 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -50,7 +50,7 @@
int lastWrite = ::write(fd, s.c_str(), s.size());
if ( lastWrite >= 0) {
bytesWritten += lastWrite;
- }
+ }
} while (errno != EAGAIN);
return bytesWritten;
}
@@ -58,32 +58,32 @@
int readALot(int fd) {
int bytesRead = 0;
char buf[1024];
-
+
do {
errno = 0;
int lastRead = ::read(fd, buf, sizeof(buf));
if ( lastRead >= 0) {
bytesRead += lastRead;
- }
+ }
} while (errno != EAGAIN);
return bytesRead;
}
int main(int /*argc*/, char** /*argv*/)
{
- try
+ try
{
int sv[2];
int rc = ::socketpair(AF_UNIX, SOCK_STREAM, 0, sv);
assert(rc >= 0);
-
+
// Set non-blocking
rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK);
assert(rc >= 0);
rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK);
assert(rc >= 0);
-
+
// Make up a large string
string testString = "This is only a test ... 1,2,3,4,5,6,7,8,9,10;";
for (int i = 0; i < 6; i++)
@@ -97,32 +97,32 @@
// Write as much as we can to socket 0
int bytesWritten = writeALot(sv[0], testString);
cout << "Wrote(0): " << bytesWritten << " bytes\n";
-
+
// Read as much as we can from socket 1
bytesRead = readALot(sv[1]);
assert(bytesRead == bytesWritten);
cout << "Read(1): " << bytesRead << " bytes\n";
auto_ptr<Poller> poller(new Poller);
-
+
PosixIOHandle f0(sv[0]);
PosixIOHandle f1(sv[1]);
PollerHandle h0(f0);
PollerHandle h1(f1);
-
+
poller->registerHandle(h0);
poller->monitorHandle(h0, Poller::INOUT);
-
+
// h0 should be writable
Poller::Event event = poller->wait();
assert(event.handle == &h0);
assert(event.type == Poller::WRITABLE);
-
+
// Write as much as we can to socket 0
bytesWritten = writeALot(sv[0], testString);
cout << "Wrote(0): " << bytesWritten << " bytes\n";
-
+
// Wait for 500ms - h0 no longer writable
event = poller->wait(500000000);
assert(event.handle == 0);
@@ -133,7 +133,7 @@
event = poller->wait();
assert(event.handle == &h1);
assert(event.type == Poller::READ_WRITABLE);
-
+
bytesRead = readALot(sv[1]);
assert(bytesRead == bytesWritten);
cout << "Read(1): " << bytesRead << " bytes\n";
@@ -147,11 +147,11 @@
// Test multiple interrupts
assert(poller->interrupt(h0) == true);
assert(poller->interrupt(h1) == true);
-
+
// Make sure we can interrupt them again
assert(poller->interrupt(h0) == true);
assert(poller->interrupt(h1) == true);
-
+
// Make sure that they both come out
event = poller->wait();
assert(event.type == Poller::INTERRUPTED);
@@ -170,25 +170,44 @@
event = poller->wait();
assert(event.handle == &h0);
- assert(event.type == Poller::WRITABLE);
+ assert(event.type == Poller::WRITABLE);
// We didn't write anything so it should still be writable
event = poller->wait();
assert(event.handle == &h0);
- assert(event.type == Poller::WRITABLE);
+ assert(event.type == Poller::WRITABLE);
poller->unmonitorHandle(h0, Poller::INOUT);
event = poller->wait(500000000);
assert(event.handle == 0);
-
+
poller->unregisterHandle(h1);
+ assert(poller->interrupt(h1) == false);
+
+ // close the other end to force a disconnect
+ ::close(sv[1]);
+
+ // Now make sure that we are readable followed by disconnected
+ // and after that we never return again
+ poller->monitorHandle(h0, Poller::INOUT);
+ event = poller->wait(500000000);
+ assert(event.handle == &h0);
+ assert(event.type == Poller::READABLE);
+ event = poller->wait(500000000);
+ assert(event.handle == &h0);
+ assert(event.type == Poller::DISCONNECTED);
+ event = poller->wait(1500000000);
+ assert(event.handle == 0);
+
+ // Now we're disconnected monitoring should have no effect at all
+ poller->unmonitorHandle(h0, Poller::INOUT);
+ event = poller->wait(1500000000);
+ assert(event.handle == 0);
+
poller->unregisterHandle(h0);
-
- // Make sure we can't interrupt them now
assert(poller->interrupt(h0) == false);
- assert(poller->interrupt(h1) == false);
-
+
// Test shutdown
poller->shutdown();
event = poller->wait();
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/TimerTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/TimerTest.cpp?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/TimerTest.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/TimerTest.cpp Mon Oct 5 11:29:03 2009
@@ -19,7 +19,7 @@
* under the License.
*
*/
-#include "qpid/broker/Timer.h"
+#include "qpid/sys/Timer.h"
#include "qpid/sys/Monitor.h"
#include "unit_test.h"
#include <math.h>
@@ -28,7 +28,6 @@
#include <boost/format.hpp>
#include <boost/lexical_cast.hpp>
-using namespace qpid::broker;
using namespace qpid::sys;
using boost::intrusive_ptr;
using boost::dynamic_pointer_cast;
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/acl.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/acl.py?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/acl.py (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/acl.py Mon Oct 5 11:29:03 2009
@@ -23,36 +23,10 @@
from qpid.util import connect
from qpid.connection import Connection
from qpid.datatypes import uuid4
-from qpid.testlib import TestBase010, testrunner
+from qpid.testlib import TestBase010
from qmf.console import Session
from qpid.datatypes import Message
-def scan_args(name, default=None, args=sys.argv[1:]):
- if (name in args):
- pos = args.index(name)
- return args[pos + 1]
- elif default:
- return default
- else:
- print "Please specify extra argument: %s" % name
- sys.exit(2)
-
-def extract_args(name, args):
- if (name in args):
- pos = args.index(name)
- del args[pos:pos+2]
- else:
- return None
-
-def get_broker_port():
- return scan_args("--port", "5672")
-
-def get_session(user, passwd):
- socket = connect('127.0.0.1', int(get_broker_port()))
- connection = Connection (sock=socket, username=user, password=passwd)
- connection.start()
- return connection.session(str(uuid4()))
-
class ACLFile:
def __init__(self):
self.f = open('data_dir/policy.acl','w');
@@ -65,6 +39,12 @@
class ACLTests(TestBase010):
+ def get_session(self, user, passwd):
+ socket = connect(self.broker.host, self.broker.port)
+ connection = Connection (sock=socket, username=user, password=passwd)
+ connection.start()
+ return connection.session(str(uuid4()))
+
def reload_acl(self):
acl = self.qmf.getObjects(_class="acl")[0]
return acl.reloadACLFile()
@@ -93,7 +73,7 @@
self.reload_acl()
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.queue_declare(queue="deny_queue")
except qpid.session.SessionException, e:
@@ -118,7 +98,7 @@
self.reload_acl()
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.queue_declare(queue="allow_queue")
except qpid.session.SessionException, e:
@@ -146,7 +126,7 @@
self.reload_acl()
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.queue_declare(queue="allow_queue")
except qpid.session.SessionException, e:
@@ -243,21 +223,21 @@
self.reload_acl()
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.queue_declare(queue="q1", durable='true', passive='true')
self.fail("ACL should deny queue create request with name=q1 durable=true passive=true");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.queue_declare(queue="q2", exclusive='true')
self.fail("ACL should deny queue create request with name=q2 exclusive=true");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.queue_declare(queue="q3", exclusive='true')
@@ -271,14 +251,14 @@
self.fail("ACL should deny queue query request for q3");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.queue_purge(queue="q3")
self.fail("ACL should deny queue purge request for q3");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.queue_purge(queue="q4")
@@ -291,7 +271,7 @@
self.fail("ACL should deny queue delete request for q4");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.queue_delete(queue="q3")
@@ -319,21 +299,21 @@
self.reload_acl()
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.exchange_declare(exchange='testEx', durable='true', passive='true')
self.fail("ACL should deny exchange create request with name=testEx durable=true passive=true");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.exchange_declare(exchange='ex1', type='direct')
self.fail("ACL should deny exchange create request with name=ex1 type=direct");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.exchange_declare(exchange='myXml', type='direct')
@@ -347,14 +327,14 @@
self.fail("ACL should deny queue query request for q3");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.exchange_bind(exchange='myEx', queue='q1', binding_key='rk1')
self.fail("ACL should deny exchange bind request with exchange='myEx' queuename='q1' bindingkey='rk1'");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.exchange_bind(exchange='myXml', queue='q1', binding_key='x')
@@ -366,7 +346,7 @@
self.fail("ACL should deny exchange unbind request with exchange='myEx' queuename='q1' bindingkey='rk1'");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.exchange_unbind(exchange='myXml', queue='q1', binding_key='x')
@@ -379,7 +359,7 @@
self.fail("ACL should deny exchange delete request for myEx");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.exchange_delete(exchange='myXml')
@@ -404,7 +384,7 @@
self.reload_acl()
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
@@ -420,14 +400,14 @@
self.fail("ACL should deny message subscriber request for queue='q1'");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.message_subscribe(queue='q2', destination='myq1')
self.fail("ACL should deny message subscriber request for queue='q2'");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.message_subscribe(queue='q3', destination='myq1')
@@ -453,7 +433,7 @@
self.reload_acl()
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.exchange_declare(exchange='myEx', type='topic')
@@ -468,14 +448,14 @@
self.fail("ACL should deny message transfer to name=amq.direct routingkey=rk1");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.message_transfer(destination="amq.topic", message=Message(props,"Test"))
self.fail("ACL should deny message transfer to name=amq.topic");
except qpid.session.SessionException, e:
self.assertEqual(530,e.args[0].error_code)
- session = get_session('bob','bob')
+ session = self.get_session('bob','bob')
try:
session.message_transfer(destination="myEx", message=Message(props,"Test"))
@@ -489,13 +469,4 @@
session.message_transfer(destination="amq.direct", message=Message(props,"Test"))
except qpid.session.SessionException, e:
if (530 == e.args[0].error_code):
- self.fail("ACL should allow message transfer to exchange amq.direct");
-
-
-if __name__ == '__main__':
- args = sys.argv[1:]
- #need to remove the extra options from args as test runner doesn't recognize them
- extract_args("--port", args)
- args.append("acl")
-
- if not testrunner.run(args): sys.exit(1)
+ self.fail("ACL should allow message transfer to exchange amq.direct");
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cli_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cli_tests.py?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cli_tests.py (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cli_tests.py Mon Oct 5 11:29:03 2009
@@ -20,44 +20,21 @@
import sys
import os
-from qpid.testlib import TestBase010, testrunner
+from qpid.testlib import TestBase010
from qpid.datatypes import Message
from qpid.queue import Empty
from time import sleep
-def add_module(args=sys.argv[1:]):
- for a in args:
- if a.startswith("cli"):
- return False
- return True
-
-def scan_args(name, default=None, args=sys.argv[1:]):
- if (name in args):
- pos = args.index(name)
- return args[pos + 1]
- elif default:
- return default
- else:
- print "Please specify extra argument: %s" % name
- sys.exit(2)
-
-def extract_args(name, args):
- if (name in args):
- pos = args.index(name)
- del args[pos:pos+2]
- else:
- return None
-
-def remote_host():
- return scan_args("--remote-host", "localhost")
+class CliTests(TestBase010):
-def remote_port():
- return int(scan_args("--remote-port"))
+ def remote_host(self):
+ return self.defines.get("remote-host", "localhost")
-def cli_dir():
- return scan_args("--cli-dir")
+ def remote_port(self):
+ return int(self.defines["remote-port"])
-class CliTests(TestBase010):
+ def cli_dir(self):
+ return self.defines["cli-dir"]
def makeQueue(self, qname, arguments):
ret = os.system(self.command(" add queue " + qname + " " + arguments))
@@ -150,15 +127,15 @@
self.startQmf();
qmf = self.qmf
- command = cli_dir() + "/qpid-route dynamic add guest/guest@localhost:%d %s:%d amq.topic" %\
- (testrunner.port, remote_host(), remote_port())
+ command = self.cli_dir() + "/qpid-route dynamic add guest/guest@localhost:%d %s:%d amq.topic" %\
+ (self.broker.port, self.remote_host(), self.remote_port())
ret = os.system(command)
self.assertEqual(ret, 0)
links = qmf.getObjects(_class="link")
found = False
for link in links:
- if link.port == remote_port():
+ if link.port == self.remote_port():
found = True
self.assertEqual(found, True)
@@ -174,18 +151,4 @@
return None
def command(self, arg = ""):
- return cli_dir() + "/qpid-config -a localhost:%d" % testrunner.port + " " + arg
-
-
-if __name__ == '__main__':
- args = sys.argv[1:]
- #need to remove the extra options from args as test runner doesn't recognise them
- extract_args("--remote-port", args)
- extract_args("--remote-host", args)
- extract_args("--cli-dir", args)
-
- if add_module():
- #add module(s) to run to testrunners args
- args.append("cli_tests")
-
- if not testrunner.run(args): sys.exit(1)
+ return self.cli_dir() + "/qpid-config -a localhost:%d" % self.broker.port + " " + arg
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cluster.mk?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cluster.mk (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cluster.mk Mon Oct 5 11:29:03 2009
@@ -29,44 +29,47 @@
# ais_check checks pre-requisites for cluster tests and runs them if ok.
-TESTS += \
- ais_check \
- run_cluster_tests \
- federated_cluster_test \
+TESTS += \
+ ais_check \
+ test_watchdog \
+ run_cluster_tests \
+ federated_cluster_test \
clustered_replication_test
-
-EXTRA_DIST += \
- ais_check \
- start_cluster \
- stop_cluster \
- restart_cluster \
- cluster_python_tests \
- cluster_python_tests_failing.txt \
- federated_cluster_test \
- clustered_replication_test \
- run_cluster_tests \
- run_long_cluster_tests \
- testlib.py \
- cluster_tests.py \
- long_cluster_tests.py
-
-
-LONG_TESTS += \
- run_long_cluster_tests \
- start_cluster \
- cluster_python_tests \
+
+EXTRA_DIST += \
+ ais_check \
+ test_watchdog \
+ start_cluster \
+ stop_cluster \
+ restart_cluster \
+ cluster_python_tests \
+ cluster_python_tests_failing.txt \
+ federated_cluster_test \
+ clustered_replication_test \
+ run_cluster_tests \
+ run_long_cluster_tests \
+ testlib.py \
+ cluster_tests.py \
+ long_cluster_tests.py
+
+LONG_TESTS += \
+ run_long_cluster_tests \
+ start_cluster \
+ cluster_python_tests \
stop_cluster
qpidtest_PROGRAMS += cluster_test
-cluster_test_SOURCES = \
- cluster_test.cpp \
- unit_test.cpp \
- ClusterFixture.cpp \
- ClusterFixture.h \
- ForkedBroker.h \
- ForkedBroker.cpp \
- PartialFailure.cpp \
- ClusterFailover.cpp
+
+cluster_test_SOURCES = \
+ cluster_test.cpp \
+ unit_test.cpp \
+ ClusterFixture.cpp \
+ ClusterFixture.h \
+ ForkedBroker.h \
+ ForkedBroker.cpp \
+ PartialFailure.cpp \
+ ClusterFailover.cpp
+
cluster_test_LDADD=$(lib_client) $(lib_broker) -lboost_unit_test_framework
qpidtest_SCRIPTS += run_cluster_tests cluster_tests.py run_long_cluster_tests long_cluster_tests.py testlib.py
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cluster_tests.py?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/cluster_tests.py Mon Oct 5 11:29:03 2009
@@ -21,10 +21,10 @@
import os, signal, sys, unittest
from testlib import TestBaseCluster
-class ClusterTests(TestBaseCluster):
+class ShortTests(TestBaseCluster):
"""Basic cluster with async store tests"""
- def test_Cluster_01_Initialization(self):
+ def test_01_Initialization(self):
"""Start a single cluster containing several nodes, and stop it again"""
try:
clusterName = "cluster-01"
@@ -34,7 +34,7 @@
self.killAllClusters(True)
raise
- def test_Cluster_02_MultipleClusterInitialization(self):
+ def test_02_MultipleClusterInitialization(self):
"""Start several clusters each with several nodes and stop them again"""
try:
for i in range(0, 5):
@@ -48,7 +48,7 @@
self.killAllClusters(True)
raise
- def test_Cluster_03_AddRemoveNodes(self):
+ def test_03_AddRemoveNodes(self):
"""Create a multi-node cluster, then kill some nodes and add some new ones (not those killed)"""
try:
clusterName = "cluster-03"
@@ -68,7 +68,7 @@
self.killAllClusters(True)
raise
- def test_Cluster_04_RemoveRestoreNodes(self):
+ def test_04_RemoveRestoreNodes(self):
"""Create a multi-node cluster, then kill some of the nodes and restart them"""
try:
clusterName = "cluster-04"
@@ -95,7 +95,7 @@
self.killAllClusters(True)
raise
- def test_Cluster_05_KillAllNodesThenRecover(self):
+ def test_05_KillAllNodesThenRecover(self):
"""Create a multi-node cluster, then kill *all* nodes, then restart the cluster"""
try:
clusterName = "cluster-05"
@@ -107,7 +107,7 @@
self.killAllClusters(True)
raise
- def test_Cluster_06_PublishConsume(self):
+ def test_06_PublishConsume(self):
"""Publish then consume 100 messages from a single cluster"""
try:
dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-06", 3, "test-exchange-06", ["test-queue-06"])
@@ -117,7 +117,7 @@
self.killAllClusters(True)
raise
- def test_Cluster_07_MultiplePublishConsume(self):
+ def test_07_MultiplePublishConsume(self):
"""Staggered publish and consume on a single cluster"""
try:
dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-07", 3, "test-exchange-07", ["test-queue-07"])
@@ -135,7 +135,7 @@
self.killAllClusters(True)
raise
- def test_Cluster_08_MsgPublishConsumeAddRemoveNodes(self):
+ def test_08_MsgPublishConsumeAddRemoveNodes(self):
"""Staggered publish and consume interleaved with adding and removing nodes on a single cluster"""
try:
dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-08", 3, "test-exchange-08", ["test-queue-08"])
@@ -159,7 +159,7 @@
self.killAllClusters(True)
raise
- def test_Cluster_09_MsgPublishConsumeRemoveRestoreNodes(self):
+ def test_09_MsgPublishConsumeRemoveRestoreNodes(self):
"""Publish and consume messages interleaved with adding and restoring previous nodes on a single cluster"""
try:
dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-09", 6, "test-exchange-09", ["test-queue-09"])
@@ -184,7 +184,7 @@
self.killAllClusters(True)
raise
- def test_Cluster_10_LinearNodeKillCreateProgression(self):
+ def test_10_LinearNodeKillCreateProgression(self):
"""Publish and consume messages while linearly killing all original nodes and replacing them with new ones"""
try:
dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-10", 4, "test-exchange-10", ["test-queue-10"])
@@ -204,7 +204,7 @@
self.killAllClusters(True)
raise
- def test_Cluster_11_CircularNodeKillRestoreProgression(self):
+ def test_11_CircularNodeKillRestoreProgression(self):
"""Publish and consume messages while circularly killing all original nodes and restoring them again"""
try:
dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-11", 4, "test-exchange-11", ["test-queue-11"])
@@ -226,7 +226,7 @@
self.killAllClusters(True)
raise
- def test_Cluster_12_KillAllNodesRecoverMessages(self):
+ def test_12_KillAllNodesRecoverMessages(self):
"""Create a cluster, add and delete messages, kill all nodes then recover cluster and messages"""
if not self._storeEnable:
print " No store loaded, skipped"
@@ -253,7 +253,7 @@
self.killAllClusters(True)
raise
- def test_Cluster_13_TopicExchange(self):
+ def test_13_TopicExchange(self):
"""Create topic exchange in a cluster and make sure it behaves correctly"""
try:
topicQueueNameKeyList = {"test-queue-13-A" : "#.A", "test-queue-13-B" : "*.B", "test-queue-13-C" : "C.#", "test-queue-13-D" : "D.*"}
@@ -290,7 +290,80 @@
self.killAllClusters(True)
raise
- def test_Cluster_14_FanoutExchange(self):
+ def test_14_FanoutExchange(self):
+ """Create fanout exchange in a cluster and make sure it behaves correctly"""
+ try:
+ fanoutQueueNameList = ["test-queue-14-A", "test-queue-14-B", "test-queue-14-C"]
+ fh = TestBaseCluster.FanoutExchangeTestHelper(self, "cluster-14", 4, "test-exchange-14", fanoutQueueNameList)
+ # Place initial 20 messages, retrieve 10
+ fh.sendMsgs(20)
+ fh.receiveMsgs(10)
+ # Kill and add some nodes
+ fh.killNode(0)
+ fh.killNode(2)
+ fh.addNodes(2)
+ # Place another 20 messages, retrieve 20
+ fh.sendMsgs(20)
+ fh.receiveMsgs(20)
+ # Kill and add another node
+ fh.killNode(4)
+ fh.addNodes()
+ # Add another 2 queues
+ fh.addQueues(["test-queue-14-D", "test-queue-14-E"])
+ # Place another 20 messages, retrieve 20
+ fh.sendMsgs(20)
+ fh.receiveMsgs(20)
+ # Kill all nodes but one
+ fh.killNode(1)
+ fh.killNode(3)
+ fh.killNode(6)
+ # Check messages
+ fh.finalizeTest()
+ except:
+ self.killAllClusters(True)
+ raise
+
+class LongTests(TestBaseCluster):
+ """Basic cluster with async store tests"""
+
+ def test_01_TopicExchange(self):
+ """Create topic exchange in a cluster and make sure it behaves correctly"""
+ try:
+ topicQueueNameKeyList = {"test-queue-13-A" : "#.A", "test-queue-13-B" : "*.B", "test-queue-13-C" : "C.#", "test-queue-13-D" : "D.*"}
+ th = TestBaseCluster.TopicExchangeTestHelper(self, "cluster-13", 4, "test-exchange-13", topicQueueNameKeyList)
+ # Place initial messages
+ th.sendMsgs("C.hello.A", 10)
+ th.sendMsgs("hello.world", 10) # matches none of the queues
+ th.sendMsgs("D.hello.A", 10)
+ th.sendMsgs("hello.B", 20)
+ th.sendMsgs("D.hello", 20)
+ # Kill and add some nodes
+ th.killNode(0)
+ th.killNode(2)
+ th.addNodes(2)
+ # Pull 10 messages from each queue
+ th.receiveMsgs(10)
+ # Kill and add another node
+ th.killNode(4)
+ th.addNodes()
+ # Add two more queues
+ th.addQueues({"test-queue-13-E" : "#.bye.A", "test-queue-13-F" : "#.bye.B"})
+ # Place more messages
+ th.sendMsgs("C.bye.A", 10)
+ th.sendMsgs("hello.bye", 20) # matches none of the queues
+ th.sendMsgs("hello.bye.B", 20)
+ th.sendMsgs("D.bye", 20)
+ # Kill all nodes but one
+ th.killNode(1)
+ th.killNode(3)
+ th.killNode(6)
+ # Pull all remaining messages from each queue and check messages
+ th.finalizeTest()
+ except:
+ self.killAllClusters(True)
+ raise
+
+ def test_02_FanoutExchange(self):
"""Create fanout exchange in a cluster and make sure it behaves correctly"""
try:
fanoutQueueNameList = ["test-queue-14-A", "test-queue-14-B", "test-queue-14-C"]
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/dlclose_noop.c
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/dlclose_noop.c?rev=821761&r1=821760&r2=821761&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/dlclose_noop.c (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/dlclose_noop.c Mon Oct 5 11:29:03 2009
@@ -26,5 +26,5 @@
*/
#include <stdio.h>
-void* dlclose(void* handle) {}
+void* dlclose(void* handle) { return 0; }
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org