You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/02/01 19:02:57 UTC
svn commit: r617582 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/
qpid/cluster/ tests/
Author: aconway
Date: Fri Feb 1 10:02:42 2008
New Revision: 617582
URL: http://svn.apache.org/viewvc?rev=617582&view=rev
Log:
Cluster code fixed for changes in codebase.
- Using SessionManager::Observer
- Better ais test setup, only need to be member of ais group.
- Update cluster_client
- SessionState holds handler chains.
- Cluster frames include next handler ptr.
Added:
incubator/qpid/trunk/qpid/cpp/src/tests/ais_run (with props)
incubator/qpid/trunk/qpid/cpp/src/tests/ais_test.cpp (with props)
Removed:
incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp
incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp Fri Feb 1 10:02:42 2008
@@ -43,12 +43,13 @@
SessionManager::~SessionManager() {}
+// FIXME aconway 2008-02-01: pass handler*, allow open unattached.
std::auto_ptr<SessionState> SessionManager::open(
SessionHandler& h, uint32_t timeout_)
{
Mutex::ScopedLock l(lock);
std::auto_ptr<SessionState> session(
- new SessionState(*this, h, timeout_, ack));
+ new SessionState(this, &h, timeout_, ack));
active.insert(session->getId());
for_each(observers.begin(), observers.end(),
boost::bind(&Observer::opened, _1,boost::ref(*session)));
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Fri Feb 1 10:02:42 2008
@@ -36,23 +36,17 @@
using qpid::management::Manageable;
using qpid::management::Args;
-void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); }
-
-void SessionState::handleOut(AMQFrame& f) {
- assert(handler);
- handler->out.handle(f);
-}
-
SessionState::SessionState(
- SessionManager& f, SessionHandler& h, uint32_t timeout_, uint32_t ack)
+ SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack)
: framing::SessionState(ack, timeout_ > 0),
- factory(f), handler(&h), id(true), timeout(timeout_),
- broker(h.getConnection().broker),
- version(h.getConnection().getVersion()),
+ factory(f), handler(h), id(true), timeout(timeout_),
+ broker(h->getConnection().broker),
+ version(h->getConnection().getVersion()),
semanticHandler(new SemanticHandler(*this))
{
- // TODO aconway 2007-09-20: SessionManager may add plugin
- // handlers to the chain.
+ in.next = semanticHandler.get();
+ out.next = &handler->out;
+
getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
Manageable* parent = broker.GetVhostObject ();
@@ -66,8 +60,8 @@
mgmtObject = management::Session::shared_ptr
(new management::Session (this, parent, id.str ()));
mgmtObject->set_attached (1);
- mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
- mgmtObject->set_channelId (h.getChannel());
+ mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId());
+ mgmtObject->set_channelId (h->getChannel());
mgmtObject->set_detachedLifespan (getTimeout());
agent->addObject (mgmtObject);
}
@@ -76,12 +70,10 @@
SessionState::~SessionState() {
// Remove ID from active session list.
- factory.erase(getId());
-
+ if (factory)
+ factory->erase(getId());
if (mgmtObject.get () != 0)
- {
mgmtObject->resourceDestroy ();
- }
}
SessionHandler* SessionState::getHandler() {
@@ -101,7 +93,7 @@
void SessionState::detach() {
getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
Mutex::ScopedLock l(lock);
- handler = 0;
+ handler = 0; out.next = 0;
if (mgmtObject.get() != 0)
{
mgmtObject->set_attached (0);
@@ -112,6 +104,7 @@
{
Mutex::ScopedLock l(lock);
handler = &h;
+ out.next = &handler->out;
if (mgmtObject.get() != 0)
{
mgmtObject->set_attached (1);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Fri Feb 1 10:02:42 2008
@@ -58,7 +58,7 @@
* themselves have state.
*/
class SessionState : public framing::SessionState,
- public framing::FrameHandler::InOutHandler,
+ public framing::FrameHandler::Chains,
public sys::OutputControl,
public management::Manageable
{
@@ -90,18 +90,15 @@
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
- protected:
- void handleIn(framing::AMQFrame&);
- void handleOut(framing::AMQFrame&);
-
- private:
- // SessionManager creates sessions.
- SessionState(SessionManager&,
- SessionHandler& out,
+ // Normally SessionManager creates sessions.
+ SessionState(SessionManager*,
+ SessionHandler* out,
uint32_t timeout,
uint32_t ackInterval);
- SessionManager& factory;
+
+ private:
+ SessionManager* factory;
SessionHandler* handler;
framing::Uuid id;
uint32_t timeout;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Feb 1 10:02:42 2008
@@ -17,6 +17,7 @@
*/
#include "Cluster.h"
+#include "qpid/broker/SessionState.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
#include "qpid/log/Statement.h"
@@ -31,7 +32,70 @@
using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
+using broker::SessionState;
+namespace {
+
+// Beginning of inbound chain: send to cluster.
+struct ClusterSendHandler : public FrameHandler {
+ SessionState& session;
+ Cluster& cluster;
+ bool busy;
+ Monitor lock;
+
+ ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
+
+ void handle(AMQFrame& f) {
+ Mutex::ScopedLock l(lock);
+ assert(!busy);
+ // FIXME aconway 2008-01-29: refcount Sessions.
+ // session.addRef(); // Keep the session till the message is self delivered.
+ cluster.send(f, next); // Indirectly send to next via cluster.
+
+ // FIXME aconway 2008-01-29: need to get this blocking out of the loop.
+ // But cluster needs to agree on order of side-effects on the shared model.
+ // OK for wiring to block, for messages use queue tokens?
+ // Both in & out transfers must be orderd per queue.
+ // May need out-of-order completion.
+ busy=true;
+ while (busy) lock.wait();
+ }
+};
+
+// Next in inbound chain, self delivered from cluster.
+struct ClusterDeliverHandler : public FrameHandler {
+ Cluster& cluster;
+ ClusterSendHandler& sender;
+
+ ClusterDeliverHandler(ClusterSendHandler& prev, Cluster& c) : cluster(c), sender(prev) {}
+
+ void handle(AMQFrame& f) {
+ next->handle(f);
+ Mutex::ScopedLock l(sender.lock);
+ sender.busy=false;
+ sender.lock.notify();
+ }
+};
+
+// FIXME aconway 2008-01-29: IList
+void insert(FrameHandler::Chain& c, FrameHandler* h) {
+ h->next = c.next;
+ c.next = h;
+}
+
+struct SessionObserver : public broker::SessionManager::Observer {
+ Cluster& cluster;
+ SessionObserver(Cluster& c) : cluster(c) {}
+
+ void opened(SessionState& s) {
+ // FIXME aconway 2008-01-29: IList for memory management.
+ ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
+ ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);
+ insert(s.in, deliverer);
+ insert(s.in, sender);
+ }
+};
+}
ostream& operator <<(ostream& out, const Cluster& cluster) {
return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
@@ -48,10 +112,10 @@
}
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) :
- FrameHandler(0), // FIXME aconway 2008-01-29: handler. + observer
cpg(*this),
name(name_),
- url(url_)
+ url(url_),
+ observer(new SessionObserver(*this))
{
QPID_LOG(trace, *this << " Joining cluster: " << name_);
cpg.join(name);
@@ -77,18 +141,19 @@
}
}
-void Cluster::handle(AMQFrame& frame) {
+void Cluster::send(AMQFrame& frame, FrameHandler* next) {
QPID_LOG(trace, *this << " SEND: " << frame);
- boost::scoped_array<char> store(new char[frame.size()]); // FIXME aconway 2008-01-29: Better buffer handling.
- Buffer buf(store.get());
+ char data[65536]; // FIXME aconway 2008-01-29: Better buffer handling.
+ Buffer buf(data);
frame.encode(buf);
- iovec iov = { store.get(), frame.size() };
+ buf.putRawData((uint8_t*)&next, sizeof(next)); // Tag the frame with the next pointer.
+ iovec iov = { data, frame.size()+sizeof(next) };
cpg.mcast(name, &iov, 1);
}
void Cluster::notify() {
AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str()));
- handle(frame);
+ send(frame, 0);
}
size_t Cluster::size() const {
@@ -112,15 +177,25 @@
void* msg,
int msg_len)
{
- Id from(nodeid, pid);
- Buffer buf(static_cast<char*>(msg), msg_len);
- AMQFrame frame;
- frame.decode(buf);
- QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
- if (frame.getChannel() == 0)
- handleClusterFrame(from, frame);
- else
- next->handle(frame);
+ try {
+ Id from(nodeid, pid);
+ Buffer buf(static_cast<char*>(msg), msg_len);
+ AMQFrame frame;
+ frame.decode(buf);
+ QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
+ if (frame.getChannel() == 0)
+ handleClusterFrame(from, frame);
+ else if (from == self) {
+ FrameHandler* next;
+ buf.getRawData((uint8_t*)&next, sizeof(next));
+ next->handle(frame);
+ }
+ // FIXME aconway 2008-01-30: apply frames from foreign sessions.
+ }
+ catch (const std::exception& e) {
+ // FIXME aconway 2008-01-30: exception handling.
+ QPID_LOG(error, "Error handling frame from cluster " << e.what());
+ }
}
bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Feb 1 10:02:42 2008
@@ -21,7 +21,6 @@
#include "Cpg.h"
-#include "qpid/framing/FrameHandler.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Runnable.h"
@@ -39,15 +38,10 @@
namespace qpid { namespace cluster {
/**
- * Connection to the cluster. Maintains cluster membership
- * data.
- *
- * As FrameHandler, handles frames by sending them to the
- * cluster. Frames received from the cluster are sent to the next
- * FrameHandler in the chain.
+ * Connection to the cluster.
+ * Keeps cluster membership data.
*/
-class Cluster : public framing::FrameHandler,
- private sys::Runnable, private Cpg::Handler
+class Cluster : private sys::Runnable, private Cpg::Handler
{
public:
/** Details of a cluster member */
@@ -68,7 +62,7 @@
virtual ~Cluster();
// FIXME aconway 2008-01-29:
- //framing::HandlerUpdater& getHandlerUpdater() { return sessions; }
+ intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; }
/** Get the current cluster membership. */
MemberList getMembers() const;
@@ -87,7 +81,7 @@
sys::Duration timeout=sys::TIME_INFINITE) const;
/** Send frame to the cluster */
- void handle(framing::AMQFrame&);
+ void send(framing::AMQFrame&, framing::FrameHandler*);
private:
typedef Cpg::Id Id;
@@ -122,6 +116,7 @@
MemberMap members;
sys::Thread dispatcher;
boost::function<void()> callback;
+ intrusive_ptr<broker::SessionManager::Observer> observer;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Fri Feb 1 10:02:42 2008
@@ -69,7 +69,7 @@
cluster = boost::in_place(options.name,
options.getUrl(broker->getPort()),
boost::ref(*broker));
- // FIXME aconway 2008-02-01: Add observer.
+ broker->getSessionManager().add(cluster->getObserver());
}
}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp Fri Feb 1 10:02:42 2008
@@ -78,12 +78,16 @@
cpg_handle_t /*handle*/,
struct cpg_name *grp,
struct cpg_address */*members*/, int nMembers,
- struct cpg_address */*left*/, int /*nLeft*/,
- struct cpg_address */*joined*/, int /*nJoined*/
+ struct cpg_address */*left*/, int nLeft,
+ struct cpg_address */*joined*/, int nJoined
)
{
BOOST_CHECK_EQUAL(group, Cpg::str(*grp));
configChanges.push_back(nMembers);
+ BOOST_MESSAGE("configChange: "<<
+ nLeft<<" left "<<
+ nJoined<<" joined "<<
+ nMembers<<" members.");
}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ais_check?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ais_check (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ais_check Fri Feb 1 10:02:42 2008
@@ -2,10 +2,12 @@
# Check for requirements, run AIS tests if found.
#
-test `id -ng` = "ais" || BADGROUP=yes
-{ ps -u root | grep aisexec >/dev/null; } || NOAISEXEC=yes
+id -nG | grep '\<ais\>' || \
+ NOGROUP="You are not a member of the ais group."
+ps -u root | grep aisexec >/dev/null || \
+ NOAISEXEC="The aisexec daemon is not running as root"
-if test -n "$BADGROUP" -o -n "$NOAISEXEC"; then
+if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then
cat <<EOF
=========== WARNING: NOT RUNNING AIS TESTS ==============
@@ -13,18 +15,8 @@
Tests that depend on the openais library (used for clustering)
will not be run because:
-EOF
- test -n "$BADGROUP" && cat <<EOF
- You do not appear to have you group ID set to "ais". Make ais your
- primary group, or run "newgrp ais" before running the tests.
-
-EOF
- test -n "$NOAISEXEC" && cat <<EOF
- The aisexec daemon is not running. Make sure /etc/ais/openais.conf
- is a valid configuration and aisexec is run by root.
-EOF
-
- cat <<EOF
+ $NOGROUP
+ $NOAISEXEC
==========================================================
@@ -32,8 +24,4 @@
exit 0; # A warning, not a failure.
fi
-FAILED=0
-for test in `cat ais_tests`; do
- ./$test || FAILED=`expr $FAILED + 1`
-done
-exit $FAILED
+echo ./ais_run | newgrp ais
Added: incubator/qpid/trunk/qpid/cpp/src/tests/ais_run
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ais_run?rev=617582&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ais_run (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ais_run Fri Feb 1 10:02:42 2008
@@ -0,0 +1,15 @@
+#!/bin/sh
+#
+# Run AIS tests, assumes that ais_check has passed and we are
+# running with the ais group ID.
+#
+
+# FIXME aconway 2008-01-30: we should valgrind the cluster brokers.
+
+srcdir=`dirname $0`
+$srcdir/start_cluster 4
+./ais_test
+ret=$?
+$srcdir/stop_cluster
+exit $ret
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ais_run
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/qpid/trunk/qpid/cpp/src/tests/ais_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ais_test.cpp?rev=617582&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ais_test.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ais_test.cpp Fri Feb 1 10:02:42 2008
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Defines test_main function to link with actual unit test code.
+#define BOOST_AUTO_TEST_MAIN // Boost 1.33
+#define BOOST_TEST_MAIN
+#include "unit_test.h"
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ais_test.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ais_test.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Fri Feb 1 10:02:42 2008
@@ -1,55 +1,20 @@
-# FIXME aconway 2007-08-31: Disabled cluster compilation,
-# has not been kept up to date with recent commits.
+if CLUSTER
+#
+# Cluster tests makefile fragment, to be included in Makefile.am
#
-# if CLUSTER
-# # Cluster tests makefile fragment, to be included in Makefile.am
-# #
+lib_cluster = $(abs_builddir)/../libqpidcluster.la
-# lib_cluster = $(abs_builddir)/../libqpidcluster.la
-
-# # NOTE: Programs using the openais library must be run with gid=ais
-# # You should do "newgrp ais" before running the tests to run these.
-# #
-
-# #
-# # Cluster tests.
-# #
-
-# # ais_check runs ais if the conditions to run AIS tests
-# # are met, otherwise it prints a warning.
-# TESTS+=ais_check
-# EXTRA_DIST+=ais_check
-# AIS_TESTS=
-
-# ais_check: ais_tests
-# ais_tests:
-# echo $(AIS_TESTS)
-# echo "# AIS tests" >$@
-# for t in $(AIS_TESTS); do echo ./$$t >$@; done
-# chmod a+x $@
-
-# CLEANFILES+=ais_tests
-
-# AIS_TESTS+=Cpg
-# check_PROGRAMS+=Cpg
-# Cpg_SOURCES=Cpg.cpp
-# Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework
-
-# # TODO aconway 2007-07-26: Fix this test.
-# #AIS_TESTS+=Cluster
-# # check_PROGRAMS+=Cluster
-# # Cluster_SOURCES=Cluster.cpp Cluster.h
-# # Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework
-
-# check_PROGRAMS+=Cluster_child
-# Cluster_child_SOURCES=Cluster_child.cpp Cluster.h
-# Cluster_child_LDADD=$(lib_cluster) -lboost_test_exec_monitor
+# NOTE: Programs using the openais library must be run with gid=ais
+# You should do "newgrp ais" before running the tests to run these.
+#
-# # TODO aconway 2007-07-03: In progress
-# #AIS_TESTS+=cluster_client
-# check_PROGRAMS+=cluster_client
-# cluster_client_SOURCES=cluster_client.cpp
-# cluster_client_LDADD=$(lib_client) -lboost_unit_test_framework
+# ais_check checks conditions for AIS tests and runs if ok.
+TESTS+=ais_check
+EXTRA_DIST+=ais_check ais_run
+
+check_PROGRAMS+=ais_test
+ais_test_SOURCES=ais_test.cpp Cpg.cpp
+ais_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
-# endif
+endif
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp Fri Feb 1 10:02:42 2008
@@ -16,21 +16,25 @@
*
*/
-#include "qpid/client/Connection.h"
-#include "qpid/shared_ptr.h"
-
#include "unit_test.h"
+#include "BrokerFixture.h"
+#include "qpid/client/Session.h"
#include <fstream>
#include <vector>
#include <functional>
-
QPID_AUTO_TEST_SUITE(cluster_clientTestSuite)
-using namespace std;
using namespace qpid;
using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::client::arg;
+using framing::TransferContent;
+using std::vector;
+using std::string;
+using std::ifstream;
+using std::ws;
struct ClusterConnections : public vector<shared_ptr<Connection> > {
ClusterConnections() {
@@ -58,25 +62,23 @@
ClusterConnections cluster;
BOOST_REQUIRE(cluster.size() > 1);
- Exchange fooEx("FooEx", Exchange::TOPIC_EXCHANGE);
- Queue fooQ("FooQ");
-
- Channel broker0;
- cluster[0]->openChannel(broker0);
- broker0.declareExchange(fooEx);
- broker0.declareQueue(fooQ);
- broker0.bind(fooEx, fooQ, "FooKey");
+ Session broker0 = cluster[0]->newSession();
+ broker0.exchangeDeclare(exchange="ex");
+ broker0.queueDeclare(queue="q");
+ broker0.queueBind(exchange="ex", queue="q", routingKey="key");
broker0.close();
for (size_t i = 1; i < cluster.size(); ++i) {
- Channel ch;
- cluster[i]->openChannel(ch);
- ch.publish(Message("hello"), fooEx, "FooKey");
- Message m;
- BOOST_REQUIRE(ch.get(m, fooQ));
- BOOST_REQUIRE_EQUAL(m.getData(), "hello");
- ch.close();
- }
+ Session s = cluster[i]->newSession();
+ s.messageTransfer(content=TransferContent("data", "key", "ex"));
+ s.messageSubscribe(queue="q", destination="q");
+ s.messageFlow(destination="q", unit=0, value=1);//messages
+ FrameSet::shared_ptr msg = s.get();
+ BOOST_CHECK(msg->isA<MessageTransferBody>());
+ BOOST_CHECK_EQUAL(string("data"), msg->getContent());
+ s.getExecution().completed(msg->getId(), true, true);
+ cluster[i]->close();
+ }
}
QPID_AUTO_TEST_SUITE_END()
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster Fri Feb 1 10:02:42 2008
@@ -12,7 +12,7 @@
OPTS=$*
CLUSTER=`whoami` # Cluster name=user name, avoid clashes.
for (( i=0; i<SIZE; ++i )); do
- PORT=`../qpidd -dp0 --log-output=cluster$i.log --cluster $CLUSTER $OPTS` || exit 1
+ PORT=`../qpidd --load-module ../.libs/libqpidcluster.so -dp0 --log-output=cluster$i.log --cluster-name $CLUSTER $OPTS` || exit 1
echo $PORT >> cluster.ports
done