You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/08/11 20:41:43 UTC

svn commit: r684865 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/cluster/ tests/

Author: aconway
Date: Mon Aug 11 11:41:42 2008
New Revision: 684865

URL: http://svn.apache.org/viewvc?rev=684865&view=rev
Log:
Integrate CPG file descriptor into broker polling.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=684865&r1=684864&r2=684865&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Aug 11 11:41:42 2008
@@ -43,16 +43,12 @@
 include $(srcdir)/rubygen.mk
 include $(srcdir)/managementgen.mk
 
-DISTCLEANFILES=$(srcdir)/rubygen.mk $(srcdir)/managementgen.mk
-
 # Code generated by C++
 noinst_PROGRAMS=generate_MaxMethodBodySize_h
 generate_MaxMethodBodySize_h_SOURCES=gen/generate_MaxMethodBodySize_h.cpp
 qpid/framing/MaxMethodBodySize.h: generate_MaxMethodBodySize_h
 	./generate_MaxMethodBodySize_h
 BUILT_SOURCES=qpid/framing/MaxMethodBodySize.h
-DISTCLEANFILES+=qpid/framing/MaxMethodBodySize.h
-
 
 ## Compiler flags
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=684865&r1=684864&r2=684865&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Aug 11 11:41:42 2008
@@ -395,5 +395,7 @@
     connect(addr.host, addr.port, false, failed, f);
 }
 
+boost::shared_ptr<sys::Poller> Broker::getPoller() { return poller; }
+
 }} // namespace qpid::broker
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=684865&r1=684864&r2=684865&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Mon Aug 11 11:41:42 2008
@@ -177,6 +177,8 @@
     // For the present just return the first ProtocolFactory registered.
     boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory() const;
 
+    /** Expose poller so plugins can register their descriptors. */
+    boost::shared_ptr<sys::Poller> getPoller(); 
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=684865&r1=684864&r2=684865&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Aug 11 11:41:42 2008
@@ -28,6 +28,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/memory.h"
 #include "qpid/shared_ptr.h"
+
 #include <boost/bind.hpp>
 #include <boost/cast.hpp>
 #include <algorithm>
@@ -57,25 +58,32 @@
 
 Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
     broker(&b),
+    poller(b.getPoller()),
     cpg(*this),
     name(name_),
     url(url_),
-    self(cpg.self())
+    self(cpg.self()),
+    cpgDispatchHandle(cpg,
+                      boost::bind(&Cluster::dispatch, this, _1), // read
+                      0,                                         // write
+                      boost::bind(&Cluster::disconnect, this, _1)) // disconnect
 {
     broker->addFinalizer(boost::bind(&Cluster::leave, this));
     QPID_LOG(trace, "Joining cluster: " << name_);
     cpg.join(name);
     notify();
-    dispatcher=Thread(*this);
-    // Wait till we show up in the cluster map.
-    {
-        Mutex::ScopedLock l(lock);
-        while (empty())
-            lock.wait();
-    }
+
+    // FIXME aconway 2008-08-11: can we remove this loop?
+    // Dispatch till we show up in the cluster map.
+    while (empty()) 
+        cpg.dispatchOne();
+
+    // Start dispatching from the poller.
+    cpgDispatchHandle.startWatch(poller);
 }
 
-Cluster::~Cluster() {}
+Cluster::~Cluster() {
+}
 
 // local connection initializes plugins
 void Cluster::initialize(broker::Connection& c) {
@@ -87,14 +95,19 @@
 void Cluster::leave() {
     Mutex::ScopedLock l(lock);
     if (!broker) return;                               // Already left.
-    assert(Thread::current().id() != dispatcher.id()); // Must not be called in the dispatch thread.
+    // At this point the poller has already been shut down so
+    // no dispatches can occur thru the cpgDispatchHandle.
+    // 
+    // FIXME aconway 2008-08-11: assert this is the cae.
+    
     QPID_LOG(debug, "Leaving cluster " << *this);
     cpg.leave(name);
-    // The dispatch thread sets broker=0 when the final config-change
-    // is delivered.
-    while(broker) lock.wait();
+    // broker= is set to 0 when the final config-change is delivered.
+    while(broker) {
+        Mutex::ScopedUnlock u(lock);
+        cpg.dispatchAll();
+    }
     cpg.shutdown();
-    dispatcher.join();
 }
 
 template <class T> void decodePtr(Buffer& buf, T*& ptr) {
@@ -134,6 +147,13 @@
     return result;        
 }
 
+// ################ HERE - leaking shadow connections.
+// FIXME aconway 2008-08-11: revisit memory management for shadow
+// connections, what if the Connection is closed other than via
+// disconnect? Dangling pointer in shadow map. Use ptr_map for shadow
+// map, add deleted state to ConnectionInterceptor? Interceptors need
+// to know about map? Check how Connections can be deleted.
+
 ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) {
     ShadowConnectionId id(member, remotePtr);
     ShadowConnectionMap::iterator i = shadowConnectionMap.find(id);
@@ -233,8 +253,16 @@
     lock.notifyAll();     // Threads waiting for membership changes.  
 }
 
-void Cluster::run() {
-    cpg.dispatchBlocking();
+void Cluster::dispatch(sys::DispatchHandle& h) {
+    cpg.dispatchAll();
+    h.rewatch();
+}
+
+void Cluster::disconnect(sys::DispatchHandle& h) {
+    h.stopWatch();
+    // FIXME aconway 2008-08-11: error handling if we are disconnected. 
+    // Kill the broker?
+    assert(0);
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=684865&r1=684864&r2=684865&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Aug 11 11:41:42 2008
@@ -24,6 +24,7 @@
 
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/sys/Dispatcher.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
@@ -47,7 +48,7 @@
  * Connection to the cluster.
  * Keeps cluster membership data.
  */
-class Cluster : private sys::Runnable, private Cpg::Handler, public RefCounted
+class Cluster : private Cpg::Handler, public RefCounted
 {
   public:
     typedef boost::tuple<Cpg::Id, void*> ShadowConnectionId;
@@ -115,7 +116,8 @@
         struct cpg_address */*joined*/, int /*nJoined*/
     );
 
-    void run();
+    void dispatch(sys::DispatchHandle&);
+    void disconnect(sys::DispatchHandle&);
 
     void handleMethod(Id from, ConnectionInterceptor* connection, framing::AMQMethodBody& method);
 
@@ -123,14 +125,15 @@
 
     mutable sys::Monitor lock;  // Protect access to members.
     broker::Broker* broker;
+    boost::shared_ptr<sys::Poller> poller;
     Cpg cpg;
     Cpg::Name name;
     Url url;
     MemberMap members;
-    sys::Thread dispatcher;
     Id self;
     ShadowConnectionMap shadowConnectionMap;
     ShadowConnectionOutputHandler shadowOut;
+    sys::DispatchHandle cpgDispatchHandle;
 
   friend std::ostream& operator <<(std::ostream&, const Cluster&);
   friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=684865&r1=684864&r2=684865&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Mon Aug 11 11:41:42 2008
@@ -18,7 +18,6 @@
 
 #include "ConnectionInterceptor.h"
 
-
 #include "qpid/broker/Broker.h"
 #include "qpid/cluster/Cluster.h"
 #include "qpid/Plugin.h"

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp?rev=684865&r1=684864&r2=684865&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp Mon Aug 11 11:41:42 2008
@@ -52,6 +52,7 @@
 }
 
 void ConnectionInterceptor::deliver(framing::AMQFrame& f) {
+    //    ostringstream os; os << f; printf("Received: %s\n", os.str().c_str()); // FIXME aconway 2008-08-08: remove
     receivedNext(f);
 }
 
@@ -83,16 +84,21 @@
 
 bool  ConnectionInterceptor::doOutput() {
     if (connection->hasOutput()) {
-        printf("doOutput send %p\n", (void*)this);
+        QPID_LOG(debug, "Intercept doOutput, call doOutputNext"); // FIXME aconway 2008-08-08: remove
         cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this);
-    } 
-
+        return doOutputNext();
+    }
     return false;
 }
 
 void ConnectionInterceptor::deliverDoOutput() {
-    printf("doOutput deliver %p\n", (void*)this);
-    doOutputNext();
+    if (isShadow()) {
+        QPID_LOG(debug, "Shadow deliver do output, call doOutputNext"); // FIXME aconway 2008-08-08: remove
+        doOutputNext();
+    }
+    else {
+        QPID_LOG(debug, "Primary deliver doOutput, ignore."); // FIXME aconway 2008-08-08: remove
+    }
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h?rev=684865&r1=684864&r2=684865&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h Mon Aug 11 11:41:42 2008
@@ -55,6 +55,8 @@
         void doOutput() {}
         void activateOutput() {}
     };
+
+    bool isShadow() { return shadowId != Cluster::ShadowConnectionId(0,0); }
     
     // Functions to intercept to Connection extension points.
     void received(framing::AMQFrame&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=684865&r1=684864&r2=684865&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Mon Aug 11 11:41:42 2008
@@ -17,8 +17,9 @@
  */
 
 #include "Cpg.h"
-
 #include "qpid/sys/Mutex.h"
+// Note cpg is currently unix-specific. Refactor if availble on other platforms.
+#include "qpid/sys/posix/PrivatePosix.h"
 #include "qpid/log/Statement.h"
 
 #include <vector>
@@ -62,11 +63,21 @@
     cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
 }
 
-Cpg::Cpg(Handler& h) : handler(h), isShutdown(false) {
+int Cpg::getFd() {
+    int fd;
+    check(cpg_fd_get(handle, &fd), "Cannot get CPG file descriptor");
+    return fd;
+}
+
+Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdown(false) {
     cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
     check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
     check(cpg_context_set(handle, this), "Cannot set CPG context");
-    QPID_LOG(debug, "Initialize CPG handle 0x" << std::hex << handle);
+    // Note: CPG is currently unix-specific. If CPG is ported to
+    // windows then this needs to be refactored into
+    // qpid::sys::<platform>
+    IOHandle::impl->fd = getFd();
+    QPID_LOG(debug, "Initialized CPG handle 0x" << std::hex << handle);
 }
 
 Cpg::~Cpg() {
@@ -93,6 +104,7 @@
 
 // TODO aconway 2008-08-07: better handling of flow control.
 // Wait for flow control to be disabled.
+// FIXME aconway 2008-08-08: does flow control check involve a round-trip? If so maybe remove...
 void Cpg::waitForFlowControl() {
     int delayNs=1000;           // one millisecond
     int tries=8;                // double the delay on each try.
@@ -178,7 +190,6 @@
     return out << string(name.value, name.length);
 }
 
-
 }} // namespace qpid::cluster
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=684865&r1=684864&r2=684865&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Mon Aug 11 11:41:42 2008
@@ -20,11 +20,15 @@
  */
 
 #include "qpid/Exception.h"
+#include "qpid/sys/IOHandle.h"
 #include "qpid/cluster/Dispatchable.h"
 
 #include <boost/tuple/tuple.hpp>
 #include <boost/tuple/tuple_comparison.hpp>
+#include <boost/scoped_ptr.hpp>
+
 #include <cassert>
+
 #include <string.h>
 
 extern "C" {
@@ -34,14 +38,15 @@
 namespace qpid {
 namespace cluster {
 
+
 /**
- * Lightweight C++ interface to cpg.h operations. 
+ * Lightweight C++ interface to cpg.h operations.
+ * 
  * Manages a single CPG handle, initialized in ctor, finialzed in destructor.
- * On error all functions throw Cpg::Exception
+ * On error all functions throw Cpg::Exception.
  *
- * NOTE: only one at a time can exist per process.
  */
-class Cpg : public Dispatchable {
+class Cpg : public sys::IOHandle {
   public:
     struct Exception : public ::qpid::Exception {
         Exception(const std::string& msg) : ::qpid::Exception(msg) {}
@@ -60,7 +65,6 @@
         std::string str() const { return std::string(value, length); }
     };
 
-
     // boost::tuple gives us == and < for free.
     struct Id : public boost::tuple<uint32_t, uint32_t>  {
         Id(uint32_t n=0, uint32_t p=0) : boost::tuple<uint32_t, uint32_t>(n, p) {}
@@ -125,11 +129,13 @@
 
     Id self() const;
 
+    int getFd();
+    
   private:
     static std::string errorStr(cpg_error_t err, const std::string& msg);
     static std::string cantJoinMsg(const Name&);
     static std::string cantLeaveMsg(const Name&); std::string cantMcastMsg(const Name&);
-    
+
     static void check(cpg_error_t result, const std::string& msg) {
         if (result != CPG_OK) throw Exception(errorStr(result, msg));
     }
@@ -172,5 +178,4 @@
 }} // namespace qpid::cluster
 
 
-
 #endif  /*!CPG_H*/

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=684865&r1=684864&r2=684865&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Mon Aug 11 11:41:42 2008
@@ -162,28 +162,6 @@
     }
 };
 
-#if 0                           // FIXME aconway 2008-08-06: 
-
-QPID_AUTO_TEST_CASE(CpgBasic) {
-    // Verify basic functionality of cpg. This will catch any
-    // openais configuration or permission errors.
-    //
-    Cpg::Name group("CpgBasic");
-    Callback cb(group.str());
-    Cpg cpg(cb);
-    cpg.join(group);
-    iovec iov = { (void*)"Hello!", 6 };
-    cpg.mcast(group, &iov, 1);
-    cpg.leave(group);
-    cpg.dispatchSome();
-
-    BOOST_REQUIRE_EQUAL(1u, cb.delivered.size());
-    BOOST_CHECK_EQUAL("Hello!", cb.delivered.front());
-    BOOST_REQUIRE_EQUAL(2u, cb.configChanges.size());
-    BOOST_CHECK_EQUAL(1, cb.configChanges[0]);
-    BOOST_CHECK_EQUAL(0, cb.configChanges[1]);
-}
-
 QPID_AUTO_TEST_CASE(testForkedBroker) {
     // Verify the ForkedBroker works as expected.
     const char* argv[] = { "", "--auth=no", "--no-data-dir", "--log-prefix=testForkedBroker" };
@@ -250,7 +228,7 @@
     BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
     BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
 }
-#endif
+
 QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
     ClusterFixture cluster(3);
     // First start a subscription.
@@ -276,6 +254,5 @@
     BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
 }
 
-// TODO aconway 2008-06-25: failover.
 
 QPID_AUTO_TEST_SUITE_END()