You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/08/12 19:31:54 UTC
svn commit: r685237 - in /incubator/qpid/trunk/qpid/cpp/src: cluster.mk
qpid/cluster/Cluster.cpp qpid/cluster/Cluster.h
qpid/cluster/PollableCondition.cpp qpid/cluster/PollableCondition.h
qpid/cluster/PollableQueue.h
Author: aconway
Date: Tue Aug 12 10:31:53 2008
New Revision: 685237
URL: http://svn.apache.org/viewvc?rev=685237&view=rev
Log:
Move frame processing out of CPG dispatch queue for cluster.
PollableQueue is a pollable in-memory queue, will probably move it to sys.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/cluster.mk
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=685237&r1=685236&r2=685237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Tue Aug 12 10:31:53 2008
@@ -16,7 +16,10 @@
qpid/cluster/ConnectionInterceptor.cpp \
qpid/cluster/ClassifierHandler.h \
qpid/cluster/ClassifierHandler.cpp \
- qpid/cluster/ShadowConnectionOutputHandler.h
+ qpid/cluster/ShadowConnectionOutputHandler.h \
+ qpid/cluster/PollableCondition.h \
+ qpid/cluster/PollableCondition.cpp \
+ qpid/cluster/PollableQueue.h
libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=685237&r1=685236&r2=685237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Aug 12 10:31:53 2008
@@ -66,7 +66,9 @@
cpgDispatchHandle(cpg,
boost::bind(&Cluster::dispatch, this, _1), // read
0, // write
- boost::bind(&Cluster::disconnect, this, _1)) // disconnect
+ boost::bind(&Cluster::disconnect, this, _1) // disconnect
+ ),
+ deliverQueue(boost::bind(&Cluster::deliverFrames, this, _1, _2))
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
QPID_LOG(trace, "Joining cluster: " << name_);
@@ -80,10 +82,10 @@
// Start dispatching from the poller.
cpgDispatchHandle.startWatch(poller);
+ deliverQueue.start(poller);
}
-Cluster::~Cluster() {
-}
+Cluster::~Cluster() {}
// local connection initializes plugins
void Cluster::initialize(broker::Connection& c) {
@@ -181,31 +183,50 @@
Buffer buf(static_cast<char*>(msg), msg_len);
AMQFrame frame;
if (!frame.decode(buf)) // Not enough data.
- throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: cluster error handling.
- ConnectionInterceptor* connection;
+ throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: error handling.
+ void* connection;
decodePtr(buf, connection);
- QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
-
- if (!broker) {
- QPID_LOG(warning, "Unexpected DLVR, already left the cluster.");
- return;
- }
- if (connection && from != self) // Look up shadow for remote connections
- connection = getShadowConnection(from, connection);
-
- if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID)
- handleMethod(from, connection, *frame.getMethod());
- else
- connection->deliver(frame);
+ deliverQueue.push(DeliveredFrame(frame, from, connection));
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
- QPID_LOG(critical, "Error in cluster delivery: " << e.what());
+ QPID_LOG(critical, "Error in cluster deliver: " << e.what());
assert(0);
throw;
}
}
+void Cluster::deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin,
+ const PollableQueue<DeliveredFrame>::iterator& end)
+{
+ for (PollableQueue<DeliveredFrame>::iterator i = begin; i != end; ++i) {
+ AMQFrame& frame(i->frame);
+ Id from(i->from);
+ ConnectionInterceptor* connection = reinterpret_cast<ConnectionInterceptor*>(i->connection);
+ try {
+ QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame);
+
+ if (!broker) {
+ QPID_LOG(warning, "Unexpected DLVR, already left the cluster.");
+ return;
+ }
+ if (connection && from != self) // Look up shadow for remote connections
+ connection = getShadowConnection(from, connection);
+
+ if (frame.getMethod() && frame.getMethod()->amqpClassId() == CLUSTER_CLASS_ID)
+ handleMethod(from, connection, *frame.getMethod());
+ else
+ connection->deliver(frame);
+ }
+ catch (const std::exception& e) {
+ // FIXME aconway 2008-01-30: exception handling.
+ QPID_LOG(critical, "Error in cluster deliverFrame: " << e.what());
+ assert(0);
+ throw;
+ }
+ }
+}
+
// Handle cluster methods
// FIXME aconway 2008-07-11: Generate/template a better dispatch mechanism.
void Cluster::handleMethod(Id from, ConnectionInterceptor* connection, AMQMethodBody& method) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=685237&r1=685236&r2=685237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Aug 12 10:31:53 2008
@@ -21,6 +21,7 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/ShadowConnectionOutputHandler.h"
+#include "qpid/cluster/PollableQueue.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
@@ -96,10 +97,17 @@
typedef std::map<Id, Member> MemberMap;
typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap;
+ struct DeliveredFrame {
+ framing::AMQFrame frame; Id from; void* connection;
+ DeliveredFrame(const framing::AMQFrame& f, const Id i, void* c)
+ : frame(f), from(i), connection(c) {}
+ };
+
boost::function<void()> shutdownNext;
void notify(); ///< Notify cluster of my details.
+ /** CPG deliver callback. */
void deliver(
cpg_handle_t /*handle*/,
struct cpg_name *group,
@@ -108,6 +116,7 @@
void* /*msg*/,
int /*msg_len*/);
+ /** CPG config change callback */
void configChange(
cpg_handle_t /*handle*/,
struct cpg_name */*group*/,
@@ -116,6 +125,10 @@
struct cpg_address */*joined*/, int /*nJoined*/
);
+ /** Callback to handle delivered frames from the deliverQueue. */
+ void deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin,
+ const PollableQueue<DeliveredFrame>::iterator& end);
+
void dispatch(sys::DispatchHandle&);
void disconnect(sys::DispatchHandle&);
@@ -134,6 +147,7 @@
ShadowConnectionMap shadowConnectionMap;
ShadowConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
+ PollableQueue<DeliveredFrame> deliverQueue;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp?rev=685237&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp Tue Aug 12 10:31:53 2008
@@ -0,0 +1,59 @@
+#ifndef QPID_SYS_LINUX_POLLABLECONDITION_CPP
+#define QPID_SYS_LINUX_POLLABLECONDITION_CPP
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// Linux implementation of PollableCondition using the conditionfd(2) system call.
+
+// FIXME aconway 2008-08-11: this could be of more general interest,
+// move to common lib.
+//
+
+#include "qpid/sys/posix/PrivatePosix.h"
+#include "qpid/cluster/PollableCondition.h"
+#include "qpid/Exception.h"
+#include <sys/eventfd.h>
+
+namespace qpid {
+namespace cluster {
+
+PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) {
+ impl->fd = ::eventfd(0, 0);
+ if (impl->fd < 0) throw ErrnoException("conditionfd() failed");
+}
+
+bool PollableCondition::clear() {
+ char buf[8];
+ ssize_t n = ::read(impl->fd, buf, 8);
+ if (n != 8) throw ErrnoException("read failed on conditionfd");
+ return *reinterpret_cast<uint64_t*>(buf);
+}
+
+void PollableCondition::set() {
+ static const uint64_t value=1;
+ ssize_t n = ::write(impl->fd, reinterpret_cast<const void*>(&value), 8);
+ if (n != 8) throw ErrnoException("write failed on conditionfd");
+}
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h?rev=685237&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h Tue Aug 12 10:31:53 2008
@@ -0,0 +1,57 @@
+#ifndef QPID_SYS_POLLABLECONDITION_H
+#define QPID_SYS_POLLABLECONDITION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IOHandle.h"
+
+// FIXME aconway 2008-08-11: this could be of more general interest,
+// move to sys namespace in common lib.
+//
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A pollable condition to integrate in-process conditions with IO
+ * conditions in a polling loop.
+ *
+ * Setting the condition makes it readable for a poller.
+ *
+ * Writable/disconnected conditions are undefined and should not be
+ * polled for.
+ */
+class PollableCondition : public sys::IOHandle {
+ public:
+ PollableCondition();
+
+ /** Set the condition, triggers readable in a poller. */
+ void set();
+
+ /** Get the current state of the condition, then clear it.
+ *@return The state of the condition before it was cleared.
+ */
+ bool clear();
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_SYS_POLLABLECONDITION_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=685237&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Tue Aug 12 10:31:53 2008
@@ -0,0 +1,99 @@
+#ifndef QPID_CLUSTER_POLLABLEQUEUE_H
+#define QPID_CLUSTER_POLLABLEQUEUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/cluster/PollableCondition.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+#include <deque>
+
+namespace qpid {
+
+namespace sys { class Poller; }
+
+namespace cluster {
+
+// FIXME aconway 2008-08-11: this could be of more general interest,
+// move to common lib.
+
+/**
+ * A queue that can be polled by sys::Poller. Any thread can push to
+ * the queue, on wakeup the poller thread processes all items on the
+ * queue by passing them to a callback in a batch.
+ */
+template <class T>
+class PollableQueue {
+ typedef std::deque<T> Queue;
+
+ public:
+ typedef typename Queue::iterator iterator;
+
+ /** Callback to process a range of items. */
+ typedef boost::function<void (const iterator&, const iterator&)> Callback;
+
+ /** When the queue is selected by the poller, values are passed to callback cb. */
+ explicit PollableQueue(const Callback& cb);
+
+ /** Push a value onto the queue. Thread safe */
+ void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); }
+
+ /** Start polling. */
+ void start(const boost::shared_ptr<sys::Poller>& poller) { handle.startWatch(poller); }
+
+ /** Stop polling. */
+ void stop() { handle.stopWatch(); }
+
+ private:
+ typedef sys::Mutex::ScopedLock ScopedLock;
+ typedef sys::Mutex::ScopedUnlock ScopedUnlock;
+
+ void dispatch(sys::DispatchHandle&);
+
+ sys::Mutex lock;
+ Callback callback;
+ PollableCondition condition;
+ sys::DispatchHandle handle;
+ Queue queue;
+ Queue batch;
+};
+
+template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12:
+ : callback(cb),
+ handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0)
+{}
+
+template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
+ ScopedLock l(lock); // Lock for concurrent push()
+ batch.clear();
+ batch.swap(queue);
+ condition.clear();
+ ScopedUnlock u(lock);
+ callback(batch.begin(), batch.end()); // Process the batch outside the lock.
+ h.rewatch();
+}
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_POLLABLEQUEUE_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
------------------------------------------------------------------------------
svn:keywords = Rev Date