You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/02/01 19:02:57 UTC

svn commit: r617582 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/ qpid/cluster/ tests/

Author: aconway
Date: Fri Feb  1 10:02:42 2008
New Revision: 617582

URL: http://svn.apache.org/viewvc?rev=617582&view=rev
Log:

Cluster code fixed for changes in codebase.
 - Using SessionManager::Observer 
 - Better ais test setup, only need to be member of ais group.
 - Update cluster_client
 - SessionState holds handler chains.
 - Cluster frames include next handler ptr.

Added:
    incubator/qpid/trunk/qpid/cpp/src/tests/ais_run   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/ais_test.cpp   (with props)
Removed:
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Cluster_child.cpp
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp Fri Feb  1 10:02:42 2008
@@ -43,12 +43,13 @@
 
 SessionManager::~SessionManager() {}
 
+// FIXME aconway 2008-02-01: pass handler*, allow open  unattached.
 std::auto_ptr<SessionState>  SessionManager::open(
     SessionHandler& h, uint32_t timeout_)
 {
     Mutex::ScopedLock l(lock);
     std::auto_ptr<SessionState> session(
-        new SessionState(*this, h, timeout_, ack));
+        new SessionState(this, &h, timeout_, ack));
     active.insert(session->getId());
     for_each(observers.begin(), observers.end(),
              boost::bind(&Observer::opened, _1,boost::ref(*session)));

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Fri Feb  1 10:02:42 2008
@@ -36,23 +36,17 @@
 using qpid::management::Manageable;
 using qpid::management::Args;
 
-void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); }
-
-void SessionState::handleOut(AMQFrame& f) {
-    assert(handler);
-    handler->out.handle(f);
-}
-
 SessionState::SessionState(
-    SessionManager& f, SessionHandler& h, uint32_t timeout_, uint32_t ack) 
+    SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack) 
     : framing::SessionState(ack, timeout_ > 0),
-      factory(f), handler(&h), id(true), timeout(timeout_),
-      broker(h.getConnection().broker),
-      version(h.getConnection().getVersion()),
+      factory(f), handler(h), id(true), timeout(timeout_),
+      broker(h->getConnection().broker),
+      version(h->getConnection().getVersion()),
       semanticHandler(new SemanticHandler(*this))
 {
-    // TODO aconway 2007-09-20: SessionManager may add plugin
-    // handlers to the chain.
+    in.next = semanticHandler.get();
+    out.next = &handler->out;
+
     getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
 
     Manageable* parent = broker.GetVhostObject ();
@@ -66,8 +60,8 @@
             mgmtObject = management::Session::shared_ptr
                 (new management::Session (this, parent, id.str ()));
             mgmtObject->set_attached (1);
-            mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
-            mgmtObject->set_channelId (h.getChannel());
+            mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId());
+            mgmtObject->set_channelId (h->getChannel());
             mgmtObject->set_detachedLifespan (getTimeout());
             agent->addObject (mgmtObject);
         }
@@ -76,12 +70,10 @@
 
 SessionState::~SessionState() {
     // Remove ID from active session list.
-    factory.erase(getId());
-
+    if (factory)
+        factory->erase(getId());
     if (mgmtObject.get () != 0)
-    {
         mgmtObject->resourceDestroy ();
-    }
 }
 
 SessionHandler* SessionState::getHandler() {
@@ -101,7 +93,7 @@
 void SessionState::detach() {
     getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
     Mutex::ScopedLock l(lock);
-    handler = 0;
+    handler = 0; out.next = 0; 
     if (mgmtObject.get() != 0)
     {
         mgmtObject->set_attached  (0);
@@ -112,6 +104,7 @@
     {
         Mutex::ScopedLock l(lock);
         handler = &h;
+        out.next = &handler->out;
         if (mgmtObject.get() != 0)
         {
             mgmtObject->set_attached (1);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Fri Feb  1 10:02:42 2008
@@ -58,7 +58,7 @@
  * themselves have state. 
  */
 class SessionState : public framing::SessionState,
-    public framing::FrameHandler::InOutHandler,
+    public framing::FrameHandler::Chains,
     public sys::OutputControl,
     public management::Manageable
 {
@@ -90,18 +90,15 @@
     management::Manageable::status_t
         ManagementMethod (uint32_t methodId, management::Args& args);
 
-  protected:
-    void handleIn(framing::AMQFrame&);
-    void handleOut(framing::AMQFrame&);
-    
-  private:
-    // SessionManager creates sessions.
-    SessionState(SessionManager&,
-                 SessionHandler& out,
+    // Normally SessionManager creates sessions.
+    SessionState(SessionManager*,
+                 SessionHandler* out,
                  uint32_t timeout,
                  uint32_t ackInterval);
     
-    SessionManager& factory;
+
+  private:
+    SessionManager* factory;
     SessionHandler* handler;    
     framing::Uuid id;
     uint32_t timeout;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Feb  1 10:02:42 2008
@@ -17,6 +17,7 @@
  */
 
 #include "Cluster.h"
+#include "qpid/broker/SessionState.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/ClusterNotifyBody.h"
 #include "qpid/log/Statement.h"
@@ -31,7 +32,70 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 using namespace std;
+using broker::SessionState;
 
+namespace {
+
+// Beginning of inbound chain: send to cluster.
+struct ClusterSendHandler : public FrameHandler {
+    SessionState& session;
+    Cluster& cluster;
+    bool busy;
+    Monitor lock;
+    
+    ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
+
+    void handle(AMQFrame& f) {
+        Mutex::ScopedLock l(lock);
+        assert(!busy);
+        // FIXME aconway 2008-01-29: refcount Sessions.
+        // session.addRef();             // Keep the session till the message is self delivered.
+        cluster.send(f, next);        // Indirectly send to next via cluster.
+
+        // FIXME aconway 2008-01-29: need to get this blocking out of the loop.
+        // But cluster needs to agree on order of side-effects on the shared model.
+        // OK for wiring to block, for messages use queue tokens?
+        // Both in & out transfers must be orderd per queue.
+        // May need out-of-order completion.
+        busy=true;
+        while (busy) lock.wait();
+    }
+};
+
+// Next in inbound chain, self delivered from cluster.
+struct ClusterDeliverHandler : public FrameHandler {
+    Cluster& cluster;
+    ClusterSendHandler& sender;
+
+    ClusterDeliverHandler(ClusterSendHandler& prev, Cluster& c) : cluster(c), sender(prev) {}
+    
+    void handle(AMQFrame& f) {
+        next->handle(f);
+        Mutex::ScopedLock l(sender.lock);
+        sender.busy=false;
+        sender.lock.notify();
+    }
+};
+
+// FIXME aconway 2008-01-29: IList
+void insert(FrameHandler::Chain& c, FrameHandler* h) {
+    h->next = c.next;
+    c.next = h;
+}
+
+struct SessionObserver : public broker::SessionManager::Observer {
+    Cluster& cluster;
+    SessionObserver(Cluster& c) : cluster(c) {}
+    
+    void opened(SessionState& s) {
+        // FIXME aconway 2008-01-29: IList for memory management.
+        ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
+        ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);
+        insert(s.in, deliverer);
+        insert(s.in, sender);
+    }
+};
+}
 
 ostream& operator <<(ostream& out, const Cluster& cluster) {
     return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
@@ -48,10 +112,10 @@
 }
 
 Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) :
-    FrameHandler(0),            // FIXME aconway 2008-01-29: handler. + observer
     cpg(*this),
     name(name_),
-    url(url_)
+    url(url_),
+    observer(new SessionObserver(*this))
 {
     QPID_LOG(trace, *this << " Joining cluster: " << name_);
     cpg.join(name);
@@ -77,18 +141,19 @@
     }
 }
 
-void Cluster::handle(AMQFrame& frame) {
+void Cluster::send(AMQFrame& frame, FrameHandler* next) {
     QPID_LOG(trace, *this << " SEND: " << frame);
-    boost::scoped_array<char> store(new char[frame.size()]);          // FIXME aconway 2008-01-29: Better buffer handling.
-    Buffer buf(store.get());
+    char data[65536]; // FIXME aconway 2008-01-29: Better buffer handling.
+    Buffer buf(data);
     frame.encode(buf);
-    iovec iov = { store.get(), frame.size() };
+    buf.putRawData((uint8_t*)&next, sizeof(next)); // Tag the frame with the next pointer.
+    iovec iov = { data, frame.size()+sizeof(next) };
     cpg.mcast(name, &iov, 1);
 }
 
 void Cluster::notify() {
     AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str()));
-    handle(frame);
+    send(frame, 0);
 }
 
 size_t Cluster::size() const {
@@ -112,15 +177,25 @@
     void* msg,
     int msg_len)
 {
-    Id from(nodeid, pid);
-    Buffer buf(static_cast<char*>(msg), msg_len);
-    AMQFrame frame;
-    frame.decode(buf);
-    QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
-    if (frame.getChannel() == 0)
-        handleClusterFrame(from, frame);
-    else
-        next->handle(frame);
+    try {
+        Id from(nodeid, pid);
+        Buffer buf(static_cast<char*>(msg), msg_len);
+        AMQFrame frame;
+        frame.decode(buf);
+        QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
+        if (frame.getChannel() == 0)
+            handleClusterFrame(from, frame);
+        else if (from == self) {
+            FrameHandler* next;
+            buf.getRawData((uint8_t*)&next, sizeof(next));
+            next->handle(frame);
+        }
+        // FIXME aconway 2008-01-30: apply frames from foreign sessions.
+    }
+    catch (const std::exception& e) {
+        // FIXME aconway 2008-01-30: exception handling.
+        QPID_LOG(error, "Error handling frame from cluster " << e.what());
+    }
 }
 
 bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Feb  1 10:02:42 2008
@@ -21,7 +21,6 @@
 
 #include "Cpg.h"
 
-#include "qpid/framing/FrameHandler.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Runnable.h"
@@ -39,15 +38,10 @@
 namespace qpid { namespace cluster {
 
 /**
- * Connection to the cluster. Maintains cluster membership
- * data.
- *
- * As FrameHandler, handles frames by sending them to the
- * cluster. Frames received from the cluster are sent to the next
- * FrameHandler in the chain.
+ * Connection to the cluster.
+ * Keeps cluster membership data.
  */
-class Cluster : public framing::FrameHandler,
-                private sys::Runnable, private Cpg::Handler
+class Cluster : private sys::Runnable, private Cpg::Handler
 {
   public:
     /** Details of a cluster member */
@@ -68,7 +62,7 @@
     virtual ~Cluster();
 
     // FIXME aconway 2008-01-29: 
-    //framing::HandlerUpdater& getHandlerUpdater() { return sessions; }
+    intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; }
     
     /** Get the current cluster membership. */
     MemberList getMembers() const;
@@ -87,7 +81,7 @@
               sys::Duration timeout=sys::TIME_INFINITE) const;
 
     /** Send frame to the cluster */
-    void handle(framing::AMQFrame&);
+    void send(framing::AMQFrame&, framing::FrameHandler*);
     
   private:
     typedef Cpg::Id Id;
@@ -122,6 +116,7 @@
     MemberMap members;
     sys::Thread dispatcher;
     boost::function<void()> callback;
+    intrusive_ptr<broker::SessionManager::Observer> observer;
 
   friend std::ostream& operator <<(std::ostream&, const Cluster&);
   friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Fri Feb  1 10:02:42 2008
@@ -69,7 +69,7 @@
             cluster = boost::in_place(options.name,
                                       options.getUrl(broker->getPort()),
                                       boost::ref(*broker));
-            // FIXME aconway 2008-02-01: Add observer.
+            broker->getSessionManager().add(cluster->getObserver());	
         }
     }
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Cpg.cpp Fri Feb  1 10:02:42 2008
@@ -78,12 +78,16 @@
         cpg_handle_t /*handle*/,
         struct cpg_name *grp,
         struct cpg_address */*members*/, int nMembers,
-        struct cpg_address */*left*/, int /*nLeft*/,
-        struct cpg_address */*joined*/, int /*nJoined*/
+        struct cpg_address */*left*/, int nLeft,
+        struct cpg_address */*joined*/, int nJoined
     )
     {
         BOOST_CHECK_EQUAL(group, Cpg::str(*grp));
         configChanges.push_back(nMembers);
+        BOOST_MESSAGE("configChange: "<<
+                      nLeft<<" left "<<
+                      nJoined<<" joined "<<
+                      nMembers<<" members.");
     }
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ais_check?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ais_check (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ais_check Fri Feb  1 10:02:42 2008
@@ -2,10 +2,12 @@
 # Check for requirements, run AIS tests if found.
 #
 
-test `id -ng` = "ais" || BADGROUP=yes
-{ ps -u root | grep aisexec >/dev/null; } || NOAISEXEC=yes
+id -nG | grep '\<ais\>' || \
+    NOGROUP="You are not a member of the ais group."
+ps -u root | grep aisexec >/dev/null || \
+    NOAISEXEC="The aisexec daemon is not running as root"
 
-if test -n "$BADGROUP" -o -n "$NOAISEXEC"; then
+if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then
     cat <<EOF
 
     =========== WARNING: NOT RUNNING AIS TESTS ==============
@@ -13,18 +15,8 @@
     Tests that depend on the openais library (used for clustering)
     will not be run because:
 
-EOF
-    test -n "$BADGROUP" && cat <<EOF
-    You do not appear to have you group ID set to "ais". Make ais your
-    primary group, or run "newgrp ais" before running the tests.
-
-EOF
-    test -n "$NOAISEXEC" && cat <<EOF    
-    The aisexec daemon is not running. Make sure /etc/ais/openais.conf
-    is a valid configuration and aisexec is run by root.
-EOF
-
-    cat <<EOF
+    $NOGROUP
+    $NOAISEXEC
 
     ==========================================================
     
@@ -32,8 +24,4 @@
     exit 0;			# A warning, not a failure.
 fi
 
-FAILED=0
-for test in `cat ais_tests`; do
-    ./$test || FAILED=`expr $FAILED + 1`
-done
-exit $FAILED
+echo ./ais_run | newgrp ais

Added: incubator/qpid/trunk/qpid/cpp/src/tests/ais_run
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ais_run?rev=617582&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ais_run (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ais_run Fri Feb  1 10:02:42 2008
@@ -0,0 +1,15 @@
+#!/bin/sh
+#
+# Run AIS tests, assumes that ais_check has passed and we are
+# running with the ais group ID.
+#
+
+# FIXME aconway 2008-01-30: we should valgrind the cluster brokers.
+
+srcdir=`dirname $0`
+$srcdir/start_cluster 4
+./ais_test
+ret=$?
+$srcdir/stop_cluster 
+exit $ret
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ais_run
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/qpid/trunk/qpid/cpp/src/tests/ais_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ais_test.cpp?rev=617582&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ais_test.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ais_test.cpp Fri Feb  1 10:02:42 2008
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Defines test_main function to link with actual unit test code.
+#define BOOST_AUTO_TEST_MAIN	// Boost 1.33
+#define BOOST_TEST_MAIN
+#include "unit_test.h"
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ais_test.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ais_test.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster.mk Fri Feb  1 10:02:42 2008
@@ -1,55 +1,20 @@
-# FIXME aconway 2007-08-31: Disabled cluster compilation,
-# has not been kept up to date with recent commits.
+if CLUSTER
+#
+# Cluster tests makefile fragment, to be included in Makefile.am
 # 
 
-# if CLUSTER
-# # Cluster tests makefile fragment, to be included in Makefile.am
-# # 
+lib_cluster = $(abs_builddir)/../libqpidcluster.la
 
-# lib_cluster = $(abs_builddir)/../libqpidcluster.la
-
-# # NOTE: Programs using the openais library must be run with gid=ais
-# # You should do "newgrp ais" before running the tests to run these.
-# # 
-
-# #
-# # Cluster tests.
-# # 
-
-# # ais_check runs ais if the conditions to run AIS tests
-# # are met, otherwise it prints a warning.
-# TESTS+=ais_check
-# EXTRA_DIST+=ais_check
-# AIS_TESTS=
-
-# ais_check: ais_tests
-# ais_tests:
-# 	echo $(AIS_TESTS)
-# 	echo "# AIS tests" >$@
-# 	for t in $(AIS_TESTS); do echo ./$$t >$@; done
-# 	chmod a+x $@
-
-# CLEANFILES+=ais_tests
-
-# AIS_TESTS+=Cpg
-# check_PROGRAMS+=Cpg
-# Cpg_SOURCES=Cpg.cpp
-# Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework
-
-# # TODO aconway 2007-07-26: Fix this test.
-# #AIS_TESTS+=Cluster
-# # check_PROGRAMS+=Cluster
-# # Cluster_SOURCES=Cluster.cpp Cluster.h
-# # Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework
-
-# check_PROGRAMS+=Cluster_child 
-# Cluster_child_SOURCES=Cluster_child.cpp Cluster.h
-# Cluster_child_LDADD=$(lib_cluster) -lboost_test_exec_monitor
+# NOTE: Programs using the openais library must be run with gid=ais
+# You should do "newgrp ais" before running the tests to run these.
+# 
 
-# # TODO aconway 2007-07-03: In progress
-# #AIS_TESTS+=cluster_client
-# check_PROGRAMS+=cluster_client
-# cluster_client_SOURCES=cluster_client.cpp
-# cluster_client_LDADD=$(lib_client) -lboost_unit_test_framework
+# ais_check checks conditions for AIS tests and runs if ok.
+TESTS+=ais_check
+EXTRA_DIST+=ais_check ais_run
+
+check_PROGRAMS+=ais_test
+ais_test_SOURCES=ais_test.cpp Cpg.cpp 
+ais_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
 
-# endif
+endif

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_client.cpp Fri Feb  1 10:02:42 2008
@@ -16,21 +16,25 @@
  *
  */
 
-#include "qpid/client/Connection.h"
-#include "qpid/shared_ptr.h"
-
 #include "unit_test.h"
+#include "BrokerFixture.h"
+#include "qpid/client/Session.h"
 
 #include <fstream>
 #include <vector>
 #include <functional>
 
-
 QPID_AUTO_TEST_SUITE(cluster_clientTestSuite)
 
-using namespace std;
 using namespace qpid;
 using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::client::arg;
+using framing::TransferContent;
+using std::vector;
+using std::string;
+using std::ifstream;
+using std::ws;
 
 struct ClusterConnections : public vector<shared_ptr<Connection> > {
     ClusterConnections() {
@@ -58,25 +62,23 @@
     ClusterConnections cluster;
     BOOST_REQUIRE(cluster.size() > 1);
 
-    Exchange fooEx("FooEx", Exchange::TOPIC_EXCHANGE);
-    Queue fooQ("FooQ");
-    
-    Channel broker0;
-    cluster[0]->openChannel(broker0);
-    broker0.declareExchange(fooEx);
-    broker0.declareQueue(fooQ);
-    broker0.bind(fooEx, fooQ, "FooKey");
+    Session broker0 = cluster[0]->newSession();
+    broker0.exchangeDeclare(exchange="ex");
+    broker0.queueDeclare(queue="q");
+    broker0.queueBind(exchange="ex", queue="q", routingKey="key");
     broker0.close();
     
     for (size_t i = 1; i < cluster.size(); ++i) {
-        Channel ch;
-        cluster[i]->openChannel(ch);
-        ch.publish(Message("hello"), fooEx, "FooKey");
-        Message m;
-        BOOST_REQUIRE(ch.get(m, fooQ));
-        BOOST_REQUIRE_EQUAL(m.getData(), "hello");
-        ch.close();
-    }
+        Session s = cluster[i]->newSession();
+        s.messageTransfer(content=TransferContent("data", "key", "ex"));
+        s.messageSubscribe(queue="q", destination="q");
+        s.messageFlow(destination="q", unit=0, value=1);//messages
+        FrameSet::shared_ptr msg = s.get();
+        BOOST_CHECK(msg->isA<MessageTransferBody>());
+        BOOST_CHECK_EQUAL(string("data"), msg->getContent());
+        s.getExecution().completed(msg->getId(), true, true);
+        cluster[i]->close();
+    }    
 }
 
 QPID_AUTO_TEST_SUITE_END()

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster?rev=617582&r1=617581&r2=617582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster Fri Feb  1 10:02:42 2008
@@ -12,7 +12,7 @@
 OPTS=$*
 CLUSTER=`whoami`		# Cluster name=user name, avoid clashes.
 for (( i=0; i<SIZE; ++i )); do
-    PORT=`../qpidd -dp0 --log-output=cluster$i.log --cluster $CLUSTER $OPTS` || exit 1
+    PORT=`../qpidd --load-module ../.libs/libqpidcluster.so -dp0 --log-output=cluster$i.log --cluster-name $CLUSTER $OPTS` || exit 1
     echo $PORT >> cluster.ports
 done