You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/08/12 19:31:54 UTC

svn commit: r685237 - in /incubator/qpid/trunk/qpid/cpp/src: cluster.mk qpid/cluster/Cluster.cpp qpid/cluster/Cluster.h qpid/cluster/PollableCondition.cpp qpid/cluster/PollableCondition.h qpid/cluster/PollableQueue.h

Author: aconway
Date: Tue Aug 12 10:31:53 2008
New Revision: 685237

URL: http://svn.apache.org/viewvc?rev=685237&view=rev
Log:
Move frame processing out of CPG dispatch queue for cluster.
PollableQueue is a pollable in-memory queue, will probably move it to sys.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=685237&r1=685236&r2=685237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Tue Aug 12 10:31:53 2008
@@ -16,7 +16,10 @@
   qpid/cluster/ConnectionInterceptor.cpp \
   qpid/cluster/ClassifierHandler.h \
   qpid/cluster/ClassifierHandler.cpp \
-  qpid/cluster/ShadowConnectionOutputHandler.h
+  qpid/cluster/ShadowConnectionOutputHandler.h \
+  qpid/cluster/PollableCondition.h \
+  qpid/cluster/PollableCondition.cpp \
+  qpid/cluster/PollableQueue.h
 
 libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=685237&r1=685236&r2=685237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Aug 12 10:31:53 2008
@@ -66,7 +66,9 @@
     cpgDispatchHandle(cpg,
                       boost::bind(&Cluster::dispatch, this, _1), // read
                       0,                                         // write
-                      boost::bind(&Cluster::disconnect, this, _1)) // disconnect
+                      boost::bind(&Cluster::disconnect, this, _1) // disconnect
+    ),
+    deliverQueue(boost::bind(&Cluster::deliverFrames, this, _1, _2))
 {
     broker->addFinalizer(boost::bind(&Cluster::leave, this));
     QPID_LOG(trace, "Joining cluster: " << name_);
@@ -80,10 +82,10 @@
 
     // Start dispatching from the poller.
     cpgDispatchHandle.startWatch(poller);
+    deliverQueue.start(poller);
 }
 
-Cluster::~Cluster() {
-}
+Cluster::~Cluster() {}
 
 // local connection initializes plugins
 void Cluster::initialize(broker::Connection& c) {
@@ -181,31 +183,50 @@
         Buffer buf(static_cast<char*>(msg), msg_len);
         AMQFrame frame;
         if (!frame.decode(buf))  // Not enough data.
-            throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: cluster error handling.
-        ConnectionInterceptor* connection;
+            throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: error handling.
+        void* connection;
         decodePtr(buf, connection);
-        QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
-
-        if (!broker) {
-            QPID_LOG(warning, "Unexpected DLVR, already left the cluster.");
-            return;
-        }
-        if (connection && from != self) // Look up shadow for remote connections
-            connection = getShadowConnection(from, connection);
-
-        if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID) 
-            handleMethod(from, connection, *frame.getMethod());
-        else 
-            connection->deliver(frame);
+        deliverQueue.push(DeliveredFrame(frame, from, connection));
     }
     catch (const std::exception& e) {
         // FIXME aconway 2008-01-30: exception handling.
-        QPID_LOG(critical, "Error in cluster delivery: " << e.what());
+        QPID_LOG(critical, "Error in cluster deliver: " << e.what());
         assert(0);
         throw;
     }
 }
 
+void Cluster::deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin,
+                            const PollableQueue<DeliveredFrame>::iterator& end)
+{
+    for (PollableQueue<DeliveredFrame>::iterator i = begin; i != end; ++i) {
+        AMQFrame& frame(i->frame);
+        Id from(i->from);
+        ConnectionInterceptor* connection = reinterpret_cast<ConnectionInterceptor*>(i->connection);
+        try {
+            QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
+
+            if (!broker) {
+                QPID_LOG(warning, "Unexpected DLVR, already left the cluster.");
+                return;
+            }
+            if (connection && from != self) // Look up shadow for remote connections
+                connection = getShadowConnection(from, connection);
+
+            if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID) 
+                handleMethod(from, connection, *frame.getMethod());
+            else 
+                connection->deliver(frame);
+        }
+        catch (const std::exception& e) {
+            // FIXME aconway 2008-01-30: exception handling.
+            QPID_LOG(critical, "Error in cluster deliverFrame: " << e.what());
+            assert(0);
+            throw;
+        }
+    }
+}
+
 // Handle cluster methods
 // FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism.
 void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=685237&r1=685236&r2=685237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Aug 12 10:31:53 2008
@@ -21,6 +21,7 @@
 
 #include "qpid/cluster/Cpg.h"
 #include "qpid/cluster/ShadowConnectionOutputHandler.h"
+#include "qpid/cluster/PollableQueue.h"
 
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
@@ -96,10 +97,17 @@
     typedef std::map<Id, Member>  MemberMap;
     typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap;
 
+    struct DeliveredFrame {
+        framing::AMQFrame frame; Id from; void* connection;
+        DeliveredFrame(const framing::AMQFrame& f, const Id i, void* c)
+            : frame(f), from(i), connection(c) {}
+    };
+
     boost::function<void()> shutdownNext;
     
     void notify();              ///< Notify cluster of my details.
 
+    /** CPG deliver callback. */
     void deliver(
         cpg_handle_t /*handle*/,
         struct cpg_name *group,
@@ -108,6 +116,7 @@
         void* /*msg*/,
         int /*msg_len*/);
 
+    /** CPG config change callback */
     void configChange(
         cpg_handle_t /*handle*/,
         struct cpg_name */*group*/,
@@ -116,6 +125,10 @@
         struct cpg_address */*joined*/, int /*nJoined*/
     );
 
+    /** Callback to handle delivered frames from the deliverQueue. */
+    void deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin,
+                       const PollableQueue<DeliveredFrame>::iterator& end);
+
     void dispatch(sys::DispatchHandle&);
     void disconnect(sys::DispatchHandle&);
 
@@ -134,6 +147,7 @@
     ShadowConnectionMap shadowConnectionMap;
     ShadowConnectionOutputHandler shadowOut;
     sys::DispatchHandle cpgDispatchHandle;
+    PollableQueue<DeliveredFrame> deliverQueue;
 
   friend std::ostream& operator <<(std::ostream&, const Cluster&);
   friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp?rev=685237&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp Tue Aug 12 10:31:53 2008
@@ -0,0 +1,59 @@
+#ifndef QPID_SYS_LINUX_POLLABLECONDITION_CPP
+#define QPID_SYS_LINUX_POLLABLECONDITION_CPP
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// Linux implementation of PollableCondition using the conditionfd(2) system call.
+
+// FIXME aconway 2008-08-11: this could be of more general interest,
+// move to common lib.
+// 
+
+#include "qpid/sys/posix/PrivatePosix.h"
+#include "qpid/cluster/PollableCondition.h"
+#include "qpid/Exception.h"
+#include <sys/eventfd.h>
+
+namespace qpid {
+namespace cluster {
+
+PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) {
+    impl->fd = ::eventfd(0, 0);
+    if (impl->fd < 0) throw ErrnoException("conditionfd() failed");
+}
+
+bool PollableCondition::clear() {
+    char buf[8];
+    ssize_t n = ::read(impl->fd, buf, 8);
+    if (n != 8) throw ErrnoException("read failed on conditionfd");
+    return *reinterpret_cast<uint64_t*>(buf);
+}
+
+void PollableCondition::set() {
+    static const uint64_t value=1;
+    ssize_t n = ::write(impl->fd, reinterpret_cast<const void*>(&value), 8);
+    if (n != 8) throw ErrnoException("write failed on conditionfd");
+}
+    
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h?rev=685237&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h Tue Aug 12 10:31:53 2008
@@ -0,0 +1,57 @@
+#ifndef QPID_SYS_POLLABLECONDITION_H
+#define QPID_SYS_POLLABLECONDITION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IOHandle.h"
+
+// FIXME aconway 2008-08-11: this could be of more general interest,
+// move to sys namespace in common lib. 
+// 
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A pollable condition to integrate in-process conditions with IO
+ * conditions in a polling loop.
+ *
+ * Setting the condition makes it readable for a poller.
+ * 
+ * Writable/disconnected conditions are undefined and should not be
+ * polled for.
+ */
+class PollableCondition : public sys::IOHandle {
+  public:
+    PollableCondition();
+
+    /** Set the condition, triggers readable in a poller. */ 
+    void set();
+
+    /** Get the current state of the condition, then clear it.
+     *@return The state of the condition before it was cleared.
+     */
+    bool clear();
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_SYS_POLLABLECONDITION_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=685237&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Tue Aug 12 10:31:53 2008
@@ -0,0 +1,99 @@
+#ifndef QPID_CLUSTER_POLLABLEQUEUE_H
+#define QPID_CLUSTER_POLLABLEQUEUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/cluster/PollableCondition.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+#include <deque>
+
+namespace qpid {
+
+namespace sys { class Poller; }
+
+namespace cluster {
+
+// FIXME aconway 2008-08-11: this could be of more general interest,
+// move to common lib.
+
+/**
+ * A queue that can be polled by sys::Poller.  Any thread can push to
+ * the queue, on wakeup the poller thread processes all items on the
+ * queue by passing them to a callback in a batch.
+ */
+template <class T>
+class PollableQueue {
+    typedef std::deque<T> Queue;
+
+  public:
+    typedef typename Queue::iterator iterator;
+    
+    /** Callback to process a range of items. */
+    typedef boost::function<void (const iterator&, const iterator&)> Callback;
+
+    /** When the queue is selected by the poller, values are passed to callback cb. */
+    explicit PollableQueue(const Callback& cb);
+
+    /** Push a value onto the queue. Thread safe */
+    void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); }
+
+    /** Start polling. */ 
+    void start(const boost::shared_ptr<sys::Poller>& poller) { handle.startWatch(poller); }
+
+    /** Stop polling. */
+    void stop() { handle.stopWatch(); }
+    
+  private:
+    typedef sys::Mutex::ScopedLock ScopedLock;
+    typedef sys::Mutex::ScopedUnlock ScopedUnlock;
+
+    void dispatch(sys::DispatchHandle&);
+    
+    sys::Mutex lock;
+    Callback callback;
+    PollableCondition condition;
+    sys::DispatchHandle handle;
+    Queue queue;
+    Queue batch;
+};
+
+template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12: 
+    : callback(cb),
+      handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0)
+{}
+
+template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
+    ScopedLock l(lock);         // Lock for concurrent push() 
+    batch.clear();
+    batch.swap(queue);
+    condition.clear();
+    ScopedUnlock u(lock);
+    callback(batch.begin(), batch.end()); // Process the batch outside the lock.
+    h.rewatch();
+}
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_POLLABLEQUEUE_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date