You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/02/22 19:23:07 UTC
svn commit: r1073448 [1/2] - in /qpid/branches/qpid-2920/qpid/cpp:
design_docs/ src/ src/qpid/broker/ src/qpid/cluster/ src/qpid/cluster/exp/
src/tests/ xml/
Author: aconway
Date: Tue Feb 22 18:23:06 2011
New Revision: 1073448
URL: http://svn.apache.org/viewvc?rev=1073448&view=rev
Log:
QPID-2920: First cut experimental prototype for new cluster.
Experimental code to investigate & measure performance of new cluster design ideas.
Experimental classes are in src/qpid/cluster/exp.
New broker::Cluster interface provides call points for cluster. Similar to
store but has more operations, may be merged at a future point.
Added:
qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Cluster.h (with props)
qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/NullCluster.h (with props)
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp (with props)
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h (with props)
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp (with props)
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.cpp (with props)
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.h (with props)
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp (with props)
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.h (with props)
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp
- copied, changed from r1073372, qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h
- copied, changed from r1073372, qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/LockedMap.h (with props)
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp (with props)
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h (with props)
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt (with props)
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp (with props)
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h (with props)
qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp (with props)
qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py (with props)
Modified:
qpid/branches/qpid-2920/qpid/cpp/design_docs/new-cluster-plan.txt
qpid/branches/qpid-2920/qpid/cpp/src/Makefile.am
qpid/branches/qpid-2920/qpid/cpp/src/cluster.mk
qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.h
qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Exchange.cpp
qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h
qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoverableExchange.h
qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.h
qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/types.h
qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am
qpid/branches/qpid-2920/qpid/cpp/src/tests/brokertest.py
qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster.mk
qpid/branches/qpid-2920/qpid/cpp/src/tests/run_cluster_tests
qpid/branches/qpid-2920/qpid/cpp/src/tests/test_env.sh.in
qpid/branches/qpid-2920/qpid/cpp/xml/cluster.xml
Modified: qpid/branches/qpid-2920/qpid/cpp/design_docs/new-cluster-plan.txt
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/design_docs/new-cluster-plan.txt?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/design_docs/new-cluster-plan.txt (original)
+++ qpid/branches/qpid-2920/qpid/cpp/design_docs/new-cluster-plan.txt Tue Feb 22 18:23:06 2011
@@ -146,10 +146,6 @@ reject(QueuedMessage):
isRejecting = true
mcast reject(qmsg)
-# FIXME no longer needed?
-drop(QueuedMessage)
- cleanup(qmsg)
-
*** MessageHandler and mcast messages
Types:
- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; }
@@ -348,6 +344,9 @@ For 0-10 can use channel numbers & send
Extend broker::Cluster interface to capture transaction context and completion.
Sequence number to generate per-node tx IDs.
Replicate transaction completion.
+** TODO [#B] Management support
+- Replicate management methods that modify queues - e.g. move, purge.
+- Report connections - local only or cluster-wide?
** TODO [#B] Batch CPG multicast messages
The new cluster design involves a lot of small multicast messages,
they need to be batched into larger CPG messages for efficiency.
Modified: qpid/branches/qpid-2920/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/Makefile.am?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/Makefile.am Tue Feb 22 18:23:06 2011
@@ -517,6 +517,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Broker.cpp \
qpid/broker/Broker.h \
qpid/broker/BrokerImportExport.h \
+ qpid/broker/Cluster.h \
qpid/broker/Connection.cpp \
qpid/broker/Connection.h \
qpid/broker/ConnectionFactory.cpp \
@@ -586,6 +587,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/PriorityQueue.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NameGenerator.h \
+ qpid/broker/NullCluster.h \
qpid/broker/NullMessageStore.cpp \
qpid/broker/NullMessageStore.h \
qpid/broker/OwnershipToken.h \
Modified: qpid/branches/qpid-2920/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/cluster.mk?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/cluster.mk (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/cluster.mk Tue Feb 22 18:23:06 2011
@@ -35,7 +35,6 @@ endif
if HAVE_LIBCPG
dmodule_LTLIBRARIES += cluster.la
-
cluster_la_SOURCES = \
$(CMAN_SOURCES) \
qpid/cluster/Cluster.cpp \
@@ -101,6 +100,30 @@ cluster_la_LIBADD= -lcpg $(libcman) lib
cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing
cluster_la_LDFLAGS = $(PLUGINLDFLAGS)
+# Experimental new cluster plugin
+dmodule_LTLIBRARIES += cluster2.la
+cluster2_la_LIBADD = -lcpg libqpidbroker.la
+cluster2_la_LDFLAGS = $(PLUGINLDFLAGS)
+cluster2_la_SOURCES = \
+ qpid/cluster/Cpg.cpp \
+ qpid/cluster/Cpg.h \
+ qpid/cluster/PollerDispatch.cpp \
+ qpid/cluster/PollerDispatch.h \
+ qpid/cluster/exp/BrokerHandler.cpp \
+ qpid/cluster/exp/BrokerHandler.h \
+ qpid/cluster/exp/Cluster2Plugin.cpp \
+ qpid/cluster/exp/Core.cpp \
+ qpid/cluster/exp/Core.h \
+ qpid/cluster/exp/EventHandler.cpp \
+ qpid/cluster/exp/EventHandler.h \
+ qpid/cluster/exp/HandlerBase.cpp \
+ qpid/cluster/exp/HandlerBase.h \
+ qpid/cluster/exp/MessageHandler.cpp \
+ qpid/cluster/exp/MessageHandler.h \
+ qpid/cluster/exp/WiringHandler.cpp \
+ qpid/cluster/exp/WiringHandler.h
+
+
# The watchdog plugin and helper executable
dmodule_LTLIBRARIES += watchdog.la
watchdog_la_SOURCES = qpid/cluster/WatchDogPlugin.cpp
Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.cpp Tue Feb 22 18:23:06 2011
@@ -25,6 +25,7 @@
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/MessageStoreModule.h"
+#include "qpid/broker/NullCluster.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/broker/SaslAuthenticator.h"
@@ -174,6 +175,7 @@ Broker::Broker(const Broker::Options& co
conf.qmf2Support)
: 0),
store(new NullMessageStore),
+ cluster(new NullCluster),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
queues(this),
@@ -758,7 +760,6 @@ void Broker::setClusterTimer(std::auto_p
const std::string Broker::TCP_TRANSPORT("tcp");
-
std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
const std::string& name,
bool durable,
@@ -819,10 +820,11 @@ std::pair<boost::shared_ptr<Queue>, bool
void Broker::deleteQueue(const std::string& name, const std::string& userId,
const std::string& connectionId, QueueFunctor check)
{
- if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
- throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
+ if ((userId.size() || connectionId.size()) && // Skip ACL check if ID is empty.
+ acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
+ throw framing::UnauthorizedAccessException(
+ QPID_MSG("ACL denied queue delete request from " << userId));
}
-
Queue::shared_ptr queue = queues.find(name);
if (queue) {
if (check) check(queue);
@@ -886,6 +888,7 @@ std::pair<Exchange::shared_ptr, bool> Br
ManagementAgent::toMap(arguments),
"created"));
}
+ getCluster().create(*result.first);
}
return result;
}
@@ -893,8 +896,8 @@ std::pair<Exchange::shared_ptr, bool> Br
void Broker::deleteExchange(const std::string& name, const std::string& userId,
const std::string& connectionId)
{
- if (acl) {
- if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
+ if ((userId.size() || connectionId.size()) && // Skip ACL check if ID is empty.
+ acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL)) {
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId));
}
@@ -907,7 +910,7 @@ void Broker::deleteExchange(const std::s
if (managementAgent.get())
managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
-
+ getCluster().destroy(*exchange);
}
void Broker::bind(const std::string& queueName,
@@ -939,6 +942,7 @@ void Broker::bind(const std::string& que
queueName, key, ManagementAgent::toMap(arguments)));
}
}
+ getCluster().bind(*queue, *exchange, key, arguments);
}
}
@@ -955,7 +959,6 @@ void Broker::unbind(const std::string& q
if (!acl->authorise(userId,acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,¶ms) )
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << userId));
}
-
Queue::shared_ptr queue = queues.find(queueName);
Exchange::shared_ptr exchange = exchanges.get(exchangeName);
if (!queue) {
@@ -965,14 +968,19 @@ void Broker::unbind(const std::string& q
} else {
if (exchange->unbind(queue, key, 0)) {
if (exchange->isDurable() && queue->isDurable()) {
- store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
+ store->unbind(*exchange, *queue, key, framing::FieldTable());
}
if (managementAgent.get()) {
managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key));
}
+ getCluster().unbind(*queue, *exchange, key, framing::FieldTable());
}
}
}
+void Broker::setCluster(std::auto_ptr<Cluster> c) { cluster = c; }
+
+Cluster& Broker::getCluster() { return *cluster; }
+
}} // namespace qpid::broker
Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.h?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.h Tue Feb 22 18:23:06 2011
@@ -72,6 +72,7 @@ namespace broker {
class ConnectionState;
class ExpiryPolicy;
class Message;
+class Cluster;
static const uint16_t DEFAULT_PORT=5672;
@@ -164,6 +165,7 @@ public:
std::auto_ptr<management::ManagementAgent> managementAgent;
ProtocolFactoryMap protocolFactories;
std::auto_ptr<MessageStore> store;
+ std::auto_ptr<Cluster> cluster;
AclModule* acl;
DataDir dataDir;
@@ -293,6 +295,9 @@ public:
bool isClusterUpdatee() const { return clusterUpdatee; }
void setClusterUpdatee(bool set) { clusterUpdatee = set; }
+ QPID_BROKER_EXTERN void setCluster(std::auto_ptr<Cluster> c);
+ QPID_BROKER_EXTERN Cluster& getCluster();
+
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
ConnectionCounter& getConnectionCounter() {return connectionCounter;}
Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Cluster.h?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Cluster.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Cluster.h Tue Feb 22 18:23:06 2011
@@ -0,0 +1,104 @@
+#ifndef QPID_BROKER_CLUSTER_H
+#define QPID_BROKER_CLUSTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+namespace framing {
+class FieldTable;
+}
+
+namespace broker {
+
+class Message;
+struct QueuedMessage;
+class Queue;
+class Exchange;
+
+/**
+ * NOTE: this is part of an experimental cluster implementation that is not
+ * yet fully functional. The original cluster implementation remains in place.
+ * See ../cluster/new-cluster-design.txt
+ *
+ * Interface for cluster implementations. Functions on this interface are
+ * called at relevant points in the Broker's processing.
+ */
+class Cluster
+{
+ public:
+ virtual ~Cluster() {}
+
+ // Messages
+
+ /** In Exchange::route, before the message is enqueued. */
+ virtual void routing(const boost::intrusive_ptr<Message>&) = 0;
+
+ /** A message is delivered to a queue.
+ * Called before actually pushing the message to the queue.
+ *@return If true the message should be pushed to the queue now.
+ * otherwise the cluster code will push the message when it is replicated.
+ */
+ virtual bool enqueue(Queue& queue, const boost::intrusive_ptr<Message>&) = 0;
+
+ /** In Exchange::route, after all enqueues for the message. */
+ virtual void routed(const boost::intrusive_ptr<Message>&) = 0;
+
+ /** A message is acquired by a local consumer, it is unavailable to replicas. */
+ virtual void acquire(const QueuedMessage&) = 0;
+
+ /** A locally-acquired message is released by the consumer and re-queued. */
+ virtual void release(const QueuedMessage&) = 0;
+
+ /** A message is removed from the queue. */
+ virtual void dequeue(const QueuedMessage&) = 0;
+
+ // Consumers
+
+ /** A new consumer subscribes to a queue. */
+ virtual void consume(const Queue&, size_t consumerCount) = 0;
+ /** A consumer cancels its subscription to a queue */
+ virtual void cancel(const Queue&, size_t consumerCount) = 0;
+
+ // Wiring
+
+ /** A queue is created */
+ virtual void create(const Queue&) = 0;
+ /** A queue is destroyed */
+ virtual void destroy(const Queue&) = 0;
+ /** An exchange is created */
+ virtual void create(const Exchange&) = 0;
+ /** An exchange is destroyed */
+ virtual void destroy(const Exchange&) = 0;
+ /** A binding is created */
+ virtual void bind(const Queue&, const Exchange&,
+ const std::string& key, const framing::FieldTable& args) = 0;
+ /** A binding is removed */
+ virtual void unbind(const Queue&, const Exchange&,
+ const std::string& key, const framing::FieldTable& args) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CLUSTER_H*/
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Cluster.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Cluster.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Exchange.cpp Tue Feb 22 18:23:06 2011
@@ -23,6 +23,7 @@
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/FedOps.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Cluster.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/broker/Queue.h"
#include "qpid/log/Statement.h"
@@ -70,10 +71,23 @@ Exchange::PreRoute::~PreRoute(){
}
}
+// Bracket a scope with calls to Cluster::routing and Cluster::routed
+struct ScopedClusterRouting {
+ Broker* broker;
+ boost::intrusive_ptr<Message> message;
+ ScopedClusterRouting(Broker* b, boost::intrusive_ptr<Message> m)
+ : broker(b), message(m) {
+ if (broker) broker->getCluster().routing(message);
+ }
+ ~ScopedClusterRouting() {
+ if (broker) broker->getCluster().routed(message);
+ }
+};
+
void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
{
+ ScopedClusterRouting scr(broker, &msg.getMessage());
int count = 0;
-
if (b.get()) {
// Block the content release if the message is transient AND there is more than one binding
if (!msg.getMessage().isPersistent() && b->size() > 1) {
Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Tue Feb 22 18:23:06 2011
@@ -24,6 +24,9 @@
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/TopicExchange.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Cluster.h"
+#include "qpid/log/Statement.h"
#include "qpid/management/ManagementDirectExchange.h"
#include "qpid/management/ManagementTopicExchange.h"
#include "qpid/framing/reply_exceptions.h"
@@ -87,11 +90,15 @@ void ExchangeRegistry::destroy(const str
}
Exchange::shared_ptr ExchangeRegistry::get(const string& name){
+ Exchange::shared_ptr ex = find(name);
+ if (!ex) throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name));
+ return ex;
+}
+
+Exchange::shared_ptr ExchangeRegistry::find(const string& name){
RWlock::ScopedRlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
- if (i == exchanges.end())
- throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name));
- return i->second;
+ return (i == exchanges.end()) ? Exchange::shared_ptr() : i->second;
}
bool ExchangeRegistry::registerExchange(const Exchange::shared_ptr& ex) {
Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Tue Feb 22 18:23:06 2011
@@ -55,6 +55,7 @@ class ExchangeRegistry{
const qpid::framing::FieldTable& args = framing::FieldTable());
QPID_BROKER_EXTERN void destroy(const std::string& name);
QPID_BROKER_EXTERN Exchange::shared_ptr get(const std::string& name);
+ QPID_BROKER_EXTERN Exchange::shared_ptr find(const std::string& name);
Exchange::shared_ptr getDefault();
/**
Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/NullCluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/NullCluster.h?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/NullCluster.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/NullCluster.h Tue Feb 22 18:23:06 2011
@@ -0,0 +1,66 @@
+#ifndef QPID_BROKER_NULLCLUSTER_H
+#define QPID_BROKER_NULLCLUSTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/broker/Cluster.h>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * No-op implementation of Cluster interface, installed by broker when
+ * no cluster plug-in is present or clustering is disabled.
+ */
+class NullCluster : public Cluster
+{
+ public:
+
+ // Messages
+
+ virtual void routing(const boost::intrusive_ptr<Message>&) {}
+ virtual bool enqueue(Queue&, const boost::intrusive_ptr<Message>&) { return true; }
+ virtual void routed(const boost::intrusive_ptr<Message>&) {}
+ virtual void acquire(const QueuedMessage&) {}
+ virtual void release(const QueuedMessage&) {}
+ virtual void dequeue(const QueuedMessage&) {}
+
+ // Consumers
+
+ virtual void consume(const Queue&, size_t) {}
+ virtual void cancel(const Queue&, size_t) {}
+
+ // Wiring
+
+ virtual void create(const Queue&) {}
+ virtual void destroy(const Queue&) {}
+ virtual void create(const Exchange&) {}
+ virtual void destroy(const Exchange&) {}
+ virtual void bind(const Queue&, const Exchange&,
+ const std::string&, const framing::FieldTable&) {}
+ virtual void unbind(const Queue&, const Exchange&,
+ const std::string&, const framing::FieldTable&) {}
+};
+
+}} // namespace qpid::broker
+
+#endif
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/NullCluster.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/NullCluster.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.cpp Tue Feb 22 18:23:06 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Cluster.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
@@ -65,7 +66,7 @@ using std::mem_fun;
namespace _qmf = qmf::org::apache::qpid::broker;
-namespace
+namespace
{
const std::string qpidMaxSize("qpid.max_size");
const std::string qpidMaxCount("qpid.max_count");
@@ -87,16 +88,16 @@ const int ENQUEUE_ONLY=1;
const int ENQUEUE_AND_DEQUEUE=2;
}
-Queue::Queue(const string& _name, bool _autodelete,
+Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const OwnershipToken* const _owner,
Manageable* parent,
Broker* b) :
- name(_name),
+ name(_name),
autodelete(_autodelete),
store(_store),
- owner(_owner),
+ owner(_owner),
consumerCount(0),
exclusive(0),
noLocal(false),
@@ -152,6 +153,10 @@ void Queue::deliver(boost::intrusive_ptr
// Check for deferred delivery in a cluster.
if (broker && broker->deferDelivery(name, msg))
return;
+ // Same thing but for the new cluster interface.
+ if (broker && !broker->getCluster().enqueue(*this, msg))
+ return;
+
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
@@ -179,9 +184,9 @@ void Queue::recover(boost::intrusive_ptr
if (policy.get()) policy->recoverEnqueued(msg);
push(msg, true);
- if (store){
+ if (store){
// setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
- msg->addToSyncList(shared_from_this(), store);
+ msg->addToSyncList(shared_from_this(), store);
}
msg->enqueueComplete(); // mark the message as enqueued
@@ -207,14 +212,14 @@ void Queue::process(boost::intrusive_ptr
void Queue::requeue(const QueuedMessage& msg){
assertClusterSafe();
QueueListeners::NotificationSet copy;
- {
+ {
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
msg.payload->enqueueComplete(); // mark the message as enqueued
messages->reinsert(msg);
listeners.populate(copy);
- // for persistLastNode - don't force a message twice to disk, but force it if no force before
+ // for persistLastNode - don't force a message twice to disk, but force it if no force before
if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
msg.payload->forcePersistent();
if (msg.payload->isForcedPersistent() ){
@@ -223,16 +228,33 @@ void Queue::requeue(const QueuedMessage&
}
}
}
+ if (broker) broker->getCluster().release(msg);
copy.notify();
}
-bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
+// Inform the cluster of an acquired message on exit from a function
+// that does the acquiring. ClusterAcquireOnExit is declared *before*
+// any locks are taken. The calling function sets qmsg to the acquired
+// message with a lock held, but the call to Cluster::acquire() will
+// be outside the lock.
+struct ClusterAcquireOnExit {
+ Broker* broker;
+ QueuedMessage qmsg;
+ ClusterAcquireOnExit(Broker* b) : broker(b) {}
+ ~ClusterAcquireOnExit() {
+ if (broker && qmsg.queue) broker->getCluster().acquire(qmsg);
+ }
+};
+
+bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
+ ClusterAcquireOnExit willAcquire(broker); // Outside lock
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
if (messages->remove(position, message)) {
QPID_LOG(debug, "Acquired message at " << position << " from " << name);
+ willAcquire.qmsg = message;
return true;
} else {
QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
@@ -270,7 +292,7 @@ bool Queue::getNextMessage(QueuedMessage
case NO_MESSAGES:
default:
return false;
- }
+ }
} else {
return browseNextMessage(m, c);
}
@@ -279,8 +301,9 @@ bool Queue::getNextMessage(QueuedMessage
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
+ ClusterAcquireOnExit willAcquire(broker); // Outside the lock
Mutex::ScopedLock locker(messageLock);
- if (messages->empty()) {
+ if (messages->empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
@@ -293,8 +316,9 @@ Queue::ConsumeCode Queue::consumeNextMes
}
if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
+ if (c->accept(msg.payload)) {
m = msg;
+ willAcquire.qmsg = msg;
pop();
return CONSUMED;
} else {
@@ -306,7 +330,7 @@ Queue::ConsumeCode Queue::consumeNextMes
//consumer will never want this message
QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
return CANT_CONSUME;
- }
+ }
}
}
}
@@ -360,7 +384,7 @@ bool Queue::dispatch(Consumer::shared_pt
}
}
-// Find the next message
+// Find the next message
bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
Mutex::ScopedLock locker(messageLock);
if (messages->next(c->position, msg)) {
@@ -379,42 +403,51 @@ QueuedMessage Queue::find(SequenceNumber
return msg;
}
-void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) {
assertClusterSafe();
- Mutex::ScopedLock locker(consumerLock);
- if(exclusive) {
- throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
- } else if(requestExclusive) {
- if(consumerCount) {
+ size_t consumers;
+ {
+ Mutex::ScopedLock locker(consumerLock);
+ if(exclusive) {
throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
- } else {
- exclusive = c->getSession();
+ QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+ } else if(requestExclusive) {
+ if(consumerCount) {
+ throw ResourceLockedException(
+ QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+ } else {
+ exclusive = c->getSession();
+ }
}
+ consumers = ++consumerCount;
}
- consumerCount++;
if (mgmtObject != 0)
mgmtObject->inc_consumerCount ();
//reset auto deletion timer if necessary
if (autoDeleteTimeout && autoDeleteTask) {
autoDeleteTask->cancel();
}
+ if (broker) broker->getCluster().consume(*this, consumers);
}
void Queue::cancel(Consumer::shared_ptr c){
removeListener(c);
- Mutex::ScopedLock locker(consumerLock);
- consumerCount--;
- if(exclusive) exclusive = 0;
- if (mgmtObject != 0)
- mgmtObject->dec_consumerCount ();
+ size_t consumers;
+ {
+ Mutex::ScopedLock locker(consumerLock);
+ consumers = --consumerCount;
+ if(exclusive) exclusive = 0;
+ if (mgmtObject != 0)
+ mgmtObject->dec_consumerCount ();
+ }
+ if (broker) broker->getCluster().cancel(*this, consumers);
}
QueuedMessage Queue::get(){
+ ClusterAcquireOnExit willAcquire(broker); // Outside lock
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
- messages->pop(msg);
+ if (messages->pop(msg)) willAcquire.qmsg = msg;
return msg;
}
@@ -432,7 +465,7 @@ void Queue::purgeExpired()
{
//As expired messages are discarded during dequeue also, only
//bother explicitly expiring if the rate of dequeues since last
- //attempt is less than one per second.
+ //attempt is less than one per second.
if (dequeueTracker.sampleRatePerSecond() < 1) {
std::deque<QueuedMessage> expired;
@@ -459,7 +492,7 @@ void Queue::purgeExpired()
uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest)
{
Mutex::ScopedLock locker(messageLock);
- uint32_t purge_count = purge_request; // only comes into play if >0
+ uint32_t purge_count = purge_request; // only comes into play if >0
std::deque<DeliverableMessage> rerouteQueue;
uint32_t count = 0;
@@ -493,7 +526,7 @@ uint32_t Queue::purge(const uint32_t pur
uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
Mutex::ScopedLock locker(messageLock);
- uint32_t move_count = qty; // only comes into play if qty >0
+ uint32_t move_count = qty; // only comes into play if qty >0
uint32_t count = 0; // count how many were moved for returning
while((!qty || move_count--) && !messages->empty()) {
@@ -516,14 +549,15 @@ void Queue::pop()
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
+ QueuedMessage qm;
QueueListeners::NotificationSet copy;
QueuedMessage removed;
bool dequeueRequired = false;
{
- Mutex::ScopedLock locker(messageLock);
- QueuedMessage qm(this, msg, ++sequence);
+ Mutex::ScopedLock locker(messageLock);
+ qm = QueuedMessage(this, msg, ++sequence);
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
-
+
dequeueRequired = messages->push(qm, removed);
listeners.populate(copy);
enqueued(qm);
@@ -602,7 +636,7 @@ void Queue::setLastNodeFailure()
}
-// return true if store exists,
+// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck)
{
ScopedUse u(barrier);
@@ -616,13 +650,13 @@ bool Queue::enqueue(TransactionContext*
policy->getPendingDequeues(dequeues);
}
//depending on policy, may have some dequeues that need to performed without holding the lock
- for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
if (inLastNodeFailure && persistLastNode){
msg->forcePersistent();
}
-
+
if (traceId.size()) {
//copy on write: take deep copy of message before modifying it
//as the frames may already be available for delivery on other
@@ -650,22 +684,20 @@ bool Queue::enqueue(TransactionContext*
void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
{
Mutex::ScopedLock locker(messageLock);
- if (policy.get()) policy->enqueueAborted(msg);
+ if (policy.get()) policy->enqueueAborted(msg);
}
-// return true if store exists,
+// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
-
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
- if (!ctxt) {
- dequeued(msg);
- }
+ if (!ctxt) dequeued(msg);
}
+ if (!ctxt && broker) broker->getCluster().dequeue(msg); // Outside lock
// This check prevents messages which have been forced persistent on one queue from dequeuing
// from another on which no forcing has taken place and thus causing a store error.
bool fp = msg.payload->isForcedPersistent();
@@ -682,8 +714,9 @@ bool Queue::dequeue(TransactionContext*
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
+ if (broker) broker->getCluster().dequeue(msg); // Outside lock
Mutex::ScopedLock locker(messageLock);
- dequeued(msg);
+ dequeued(msg);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
@@ -726,6 +759,7 @@ void Queue::create(const FieldTable& _se
store->create(*this, _settings);
}
configure(_settings);
+ if (broker) broker->getCluster().create(*this);
}
@@ -738,8 +772,8 @@ int getIntegerSetting(const qpid::framin
return v->get<int>();
} else if (v->convertsTo<std::string>()){
std::string s = v->get<std::string>();
- try {
- return boost::lexical_cast<int>(s);
+ try {
+ return boost::lexical_cast<int>(s);
} catch(const boost::bad_lexical_cast&) {
QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
return 0;
@@ -757,8 +791,7 @@ void Queue::configure(const FieldTable&
if (eventMode && broker) {
broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
}
-
- if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
+ if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
(!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
if ( NullMessageStore::isNullStore(store)) {
QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
@@ -796,7 +829,7 @@ void Queue::configure(const FieldTable&
QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
}
}
-
+
persistLastNode= _settings.get(qpidPersistLastNode);
if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
@@ -805,15 +838,15 @@ void Queue::configure(const FieldTable&
if (excludeList.size()) {
split(traceExclude, excludeList, ", ");
}
- QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
<< "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
- if (autoDeleteTimeout)
- QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
+ if (autoDeleteTimeout)
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
if (mgmtObject != 0) {
mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
@@ -847,6 +880,7 @@ void Queue::destroyed()
}
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
+ if (broker) broker->getCluster().destroy(*this);
}
void Queue::notifyDeleted()
@@ -881,9 +915,9 @@ const QueuePolicy* Queue::getPolicy()
return policy.get();
}
-uint64_t Queue::getPersistenceId() const
-{
- return persistenceId;
+uint64_t Queue::getPersistenceId() const
+{
+ return persistenceId;
}
void Queue::setPersistenceId(uint64_t _persistenceId) const
@@ -897,11 +931,11 @@ void Queue::setPersistenceId(uint64_t _p
persistenceId = _persistenceId;
}
-void Queue::encode(Buffer& buffer) const
+void Queue::encode(Buffer& buffer) const
{
buffer.putShortString(name);
buffer.put(settings);
- if (policy.get()) {
+ if (policy.get()) {
buffer.put(*policy);
}
buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
@@ -953,7 +987,7 @@ boost::shared_ptr<Exchange> Queue::getAl
void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
{
- if (broker.getQueues().destroyIf(queue->getName(),
+ if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
QPID_LOG(debug, "Auto-deleting " << queue->getName());
queue->destroyed();
@@ -965,7 +999,7 @@ struct AutoDeleteTask : qpid::sys::Timer
Broker& broker;
Queue::shared_ptr queue;
- AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
+ AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
: qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
void fire()
@@ -983,27 +1017,27 @@ void Queue::tryAutoDelete(Broker& broker
if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
- broker.getClusterTimer().add(queue->autoDeleteTask);
+ broker.getClusterTimer().add(queue->autoDeleteTask);
QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
} else {
tryAutoDeleteImpl(broker, queue);
}
}
-bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
-{
+bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
+{
Mutex::ScopedLock locker(ownershipLock);
- return o == owner;
+ return o == owner;
}
-void Queue::releaseExclusiveOwnership()
-{
+void Queue::releaseExclusiveOwnership()
+{
Mutex::ScopedLock locker(ownershipLock);
- owner = 0;
+ owner = 0;
}
-bool Queue::setExclusiveOwner(const OwnershipToken* const o)
-{
+bool Queue::setExclusiveOwner(const OwnershipToken* const o)
+{
//reset auto deletion timer if necessary
if (autoDeleteTimeout && autoDeleteTask) {
autoDeleteTask->cancel();
@@ -1012,25 +1046,25 @@ bool Queue::setExclusiveOwner(const Owne
if (owner) {
return false;
} else {
- owner = o;
+ owner = o;
return true;
}
}
-bool Queue::hasExclusiveOwner() const
-{
+bool Queue::hasExclusiveOwner() const
+{
Mutex::ScopedLock locker(ownershipLock);
- return owner != 0;
+ return owner != 0;
}
-bool Queue::hasExclusiveConsumer() const
-{
- return exclusive;
+bool Queue::hasExclusiveConsumer() const
+{
+ return exclusive;
}
void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
- if (externalQueueStore!=inst && externalQueueStore)
- delete externalQueueStore;
+ if (externalQueueStore!=inst && externalQueueStore)
+ delete externalQueueStore;
externalQueueStore = inst;
if (inst) {
Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h Tue Feb 22 18:23:06 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,13 +34,12 @@ struct QueuedMessage
framing::SequenceNumber position;
Queue* queue;
- QueuedMessage() : queue(0) {}
- QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
+ QueuedMessage(Queue* q=0) : position(0), queue(q) {}
+ QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
payload(msg), position(sn), queue(q) {}
- QueuedMessage(Queue* q) : queue(q) {}
-
+
};
- inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; }
+ inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; }
}}
Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoverableExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoverableExchange.h?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoverableExchange.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoverableExchange.h Tue Feb 22 18:23:06 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,6 +44,8 @@ public:
const std::string& routingKey,
qpid::framing::FieldTable& args) = 0;
virtual ~RecoverableExchange() {};
+
+ virtual const std::string& getName() const = 0;
};
}}
Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Tue Feb 22 18:23:06 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -62,7 +62,7 @@ class RecoverableQueueImpl : public Reco
public:
RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : queue(_queue) {}
~RecoverableQueueImpl() {};
- void setPersistenceId(uint64_t id);
+ void setPersistenceId(uint64_t id);
uint64_t getPersistenceId() const;
const std::string& getName() const;
void setExternalQueueStore(ExternalQueueStore* inst);
@@ -80,6 +80,7 @@ public:
RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {}
void setPersistenceId(uint64_t id);
void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args);
+ const std::string& getName() const;
};
class RecoverableConfigImpl : public RecoverableConfig
@@ -133,7 +134,7 @@ RecoverableMessage::shared_ptr RecoveryM
return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message));
}
-RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
+RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn)
{
DtxBuffer::shared_ptr buffer(new DtxBuffer());
@@ -202,7 +203,7 @@ void RecoverableQueueImpl::setPersistenc
{
queue->setPersistenceId(id);
}
-
+
uint64_t RecoverableQueueImpl::getPersistenceId() const
{
return queue->getPersistenceId();
@@ -212,7 +213,7 @@ const std::string& RecoverableQueueImpl:
{
return queue->getName();
}
-
+
void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst)
{
queue->setExternalQueueStore(inst);
@@ -245,6 +246,11 @@ void RecoverableExchangeImpl::bind(const
queue->bound(exchange->getName(), key, args);
}
+const std::string& RecoverableExchangeImpl::getName() const
+{
+ return exchange->getName();
+}
+
void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
{
buffer->enlist(TxOp::shared_ptr(new RecoveredDequeue(queue, msg)));
Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp Tue Feb 22 18:23:06 2011
@@ -37,9 +37,11 @@ PollerDispatch::PollerDispatch(Cpg& c, b
started(false)
{}
-PollerDispatch::~PollerDispatch() {
- if (started)
- dispatchHandle.stopWatch();
+PollerDispatch::~PollerDispatch() { stop(); }
+
+void PollerDispatch::stop() {
+ if (started) dispatchHandle.stopWatch();
+ started = false;
}
void PollerDispatch::start() {
@@ -54,6 +56,7 @@ void PollerDispatch::dispatch(sys::Dispa
h.rewatch();
} catch (const std::exception& e) {
QPID_LOG(critical, "Error in cluster dispatch: " << e.what());
+ stop();
onError();
}
}
Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.h?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.h Tue Feb 22 18:23:06 2011
@@ -41,6 +41,7 @@ class PollerDispatch {
~PollerDispatch();
void start();
+ void stop();
private:
// Poller callbacks
Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp Tue Feb 22 18:23:06 2011
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "BrokerHandler.h"
+#include "qpid/framing/ClusterMessageRoutingBody.h"
+#include "qpid/framing/ClusterMessageRoutedBody.h"
+#include "qpid/framing/ClusterMessageEnqueueBody.h"
+#include "qpid/framing/ClusterMessageDequeueBody.h"
+#include "qpid/framing/ClusterWiringCreateQueueBody.h"
+#include "qpid/framing/ClusterWiringCreateExchangeBody.h"
+#include "qpid/framing/ClusterWiringDestroyQueueBody.h"
+#include "qpid/framing/ClusterWiringDestroyExchangeBody.h"
+#include "qpid/framing/ClusterWiringBindBody.h"
+#include "qpid/framing/ClusterWiringUnbindBody.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+using namespace broker;
+
+namespace {
+// noReplicate means the current thread is handling a message
+// received from the cluster so it should not be replciated.
+QPID_TSS bool tssNoReplicate = false;
+
+// Routing ID of the message being routed in the current thread.
+// 0 if we are not currently routing a message.
+QPID_TSS RoutingId tssRoutingId = 0;
+}
+
+BrokerHandler::ScopedSuppressReplication::ScopedSuppressReplication() {
+ assert(!tssNoReplicate);
+ tssNoReplicate = true;
+}
+
+BrokerHandler::ScopedSuppressReplication::~ScopedSuppressReplication() {
+ assert(tssNoReplicate);
+ tssNoReplicate = false;
+}
+
+BrokerHandler::BrokerHandler(Core& c) : core(c) {}
+
+RoutingId BrokerHandler::nextRoutingId() {
+ RoutingId id = ++routingId;
+ if (id == 0) id = ++routingId; // Avoid 0 on wrap-around.
+ return id;
+}
+
+void BrokerHandler::routing(const boost::intrusive_ptr<Message>&) { }
+
+bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg)
+{
+ if (tssNoReplicate) return true;
+ if (!tssRoutingId) { // This is the first enqueue, so send the message
+ tssRoutingId = nextRoutingId();
+ // FIXME aconway 2010-10-20: replicate message in fixed size buffers.
+ std::string data(msg->encodedSize(),char());
+ framing::Buffer buf(&data[0], data.size());
+ msg->encode(buf);
+ core.mcast(ClusterMessageRoutingBody(ProtocolVersion(), tssRoutingId, data));
+ core.getRoutingMap().put(tssRoutingId, msg);
+ }
+ core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), tssRoutingId, queue.getName()));
+ // TODO aconway 2010-10-21: configable option for strict (wait
+ // for CPG deliver to do local deliver) vs. loose (local deliver
+ // immediately).
+ return false;
+}
+
+void BrokerHandler::routed(const boost::intrusive_ptr<Message>&) {
+ if (tssRoutingId) { // we enqueued at least one message.
+ core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), tssRoutingId));
+ // Note: routingMap is cleaned up on CPG delivery in MessageHandler.
+ tssRoutingId = 0;
+ }
+}
+
+void BrokerHandler::dequeue(const broker::QueuedMessage& qm) {
+ if (tssNoReplicate) return;
+ // FIXME aconway 2010-10-28: we also need to delay completion of the
+ // ack that caused this dequeue until self-delivery of the mcast below.
+ core.mcast(ClusterMessageDequeueBody(
+ ProtocolVersion(), qm.queue->getName(), qm.position));
+}
+
+void BrokerHandler::create(const broker::Queue& q) {
+ if (tssNoReplicate) return;
+ std::string data(q.encodedSize(), '\0');
+ framing::Buffer buf(&data[0], data.size());
+ q.encode(buf);
+ core.mcast(ClusterWiringCreateQueueBody(ProtocolVersion(), data));
+}
+
+void BrokerHandler::destroy(const broker::Queue& q) {
+ if (tssNoReplicate) return;
+ core.mcast(ClusterWiringDestroyQueueBody(ProtocolVersion(), q.getName()));
+}
+
+void BrokerHandler::create(const broker::Exchange& ex) {
+ if (tssNoReplicate) return;
+ std::string data(ex.encodedSize(), '\0');
+ framing::Buffer buf(&data[0], data.size());
+ ex.encode(buf);
+ core.mcast(ClusterWiringCreateExchangeBody(ProtocolVersion(), data));
+}
+
+void BrokerHandler::destroy(const broker::Exchange& ex) {
+ if (tssNoReplicate) return;
+ core.mcast(ClusterWiringDestroyExchangeBody(ProtocolVersion(), ex.getName()));
+}
+
+void BrokerHandler::bind(const broker::Queue& q, const broker::Exchange& ex,
+ const std::string& key, const framing::FieldTable& args)
+{
+ if (tssNoReplicate) return;
+ core.mcast(ClusterWiringBindBody(
+ ProtocolVersion(), q.getName(), ex.getName(), key, args));
+}
+
+void BrokerHandler::unbind(const broker::Queue& q, const broker::Exchange& ex,
+ const std::string& key, const framing::FieldTable& args)
+{
+ if (tssNoReplicate) return;
+ core.mcast(ClusterWiringUnbindBody(
+ ProtocolVersion(), q.getName(), ex.getName(), key, args));
+}
+
+}} // namespace qpid::cluster
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h Tue Feb 22 18:23:06 2011
@@ -0,0 +1,86 @@
+#ifndef QPID_CLUSTER_BROKERHANDLER_H
+#define QPID_CLUSTER_BROKERHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Cluster.h"
+#include "qpid/sys/AtomicValue.h"
+
+namespace qpid {
+namespace cluster {
+class Core;
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+/**
+ * Implements broker::Cluster interface, handles events in broker code.
+ */
+class BrokerHandler : public broker::Cluster
+{
+ public:
+ /** Suppress replication while in scope.
+ * Used to prevent re-replication of messages received from the cluster.
+ */
+ struct ScopedSuppressReplication {
+ ScopedSuppressReplication();
+ ~ScopedSuppressReplication();
+ };
+
+ BrokerHandler(Core&);
+
+ // FIXME aconway 2010-10-20: implement all points.
+
+ // Messages
+
+ void routing(const boost::intrusive_ptr<broker::Message>&);
+ bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&);
+ void routed(const boost::intrusive_ptr<broker::Message>&);
+ void acquire(const broker::QueuedMessage&) {}
+ void release(const broker::QueuedMessage&) {}
+ void dequeue(const broker::QueuedMessage&);
+
+ // Consumers
+
+ void consume(const broker::Queue&, size_t) {}
+ void cancel(const broker::Queue&, size_t) {}
+
+ // Wiring
+
+ void create(const broker::Queue&);
+ void destroy(const broker::Queue&);
+ void create(const broker::Exchange&);
+ void destroy(const broker::Exchange&);
+ void bind(const broker::Queue&, const broker::Exchange&,
+ const std::string&, const framing::FieldTable&);
+ void unbind(const broker::Queue&, const broker::Exchange&,
+ const std::string&, const framing::FieldTable&);
+
+
+ private:
+ uint32_t nextRoutingId();
+
+ Core& core;
+ sys::AtomicValue<uint32_t> routingId;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_BROKERHANDLER_H*/
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp Tue Feb 22 18:23:06 2011
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <qpid/Options.h>
+#include <qpid/broker/Broker.h>
+#include "Core.h"
+
+namespace qpid {
+namespace cluster {
+using broker::Broker;
+
+// TODO aconway 2010-10-19: experimental new cluster code.
+
+/**
+ * Plugin for the cluster.
+ */
+struct Cluster2Plugin : public Plugin {
+ struct Opts : public Options {
+ Core::Settings& settings;
+ Opts(Core::Settings& s) : Options("Cluster Options"), settings(s) {
+ addOptions()
+ ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join");
+ // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h
+ }
+ };
+
+ Core::Settings settings;
+ Opts options;
+ Core* core; // Core deletes itself on shutdown.
+
+ Cluster2Plugin() : options(settings), core(0) {}
+
+ Options* getOptions() { return &options; }
+
+ void earlyInitialize(Plugin::Target& target) {
+ if (settings.name.empty()) return;
+ Broker* broker = dynamic_cast<Broker*>(&target);
+ if (!broker) return;
+ core = new Core(settings, *broker);
+ }
+
+ void initialize(Plugin::Target& target) {
+ Broker* broker = dynamic_cast<Broker*>(&target);
+ if (broker && core) core->initialize();
+ }
+};
+
+static Cluster2Plugin instance; // Static initialization.
+
+}} // namespace qpid::cluster
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.cpp?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.cpp (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.cpp Tue Feb 22 18:23:06 2011
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "EventHandler.h"
+#include "BrokerHandler.h"
+#include "WiringHandler.h"
+#include "MessageHandler.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/SignalHandler.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/log/Statement.h"
+#include <sys/uio.h> // For iovec
+
+namespace qpid {
+namespace cluster {
+
+Core::Core(const Settings& s, broker::Broker& b) :
+ broker(b),
+ eventHandler(new EventHandler(*this))
+{
+ eventHandler->add(boost::shared_ptr<HandlerBase>(new WiringHandler(*eventHandler)));
+ eventHandler->add(boost::shared_ptr<HandlerBase>(new MessageHandler(*eventHandler)));
+
+ std::auto_ptr<BrokerHandler> bh(new BrokerHandler(*this));
+ brokerHandler = bh.get();
+ // BrokerHandler belongs to Broker
+ broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
+ eventHandler->start();
+ eventHandler->getCpg().join(s.name);
+ // TODO aconway 2010-11-18: logging standards
+ QPID_LOG(notice, "cluster: joined " << s.name << ", member-id="<< eventHandler->getSelf());
+}
+
+void Core::initialize() {}
+
+void Core::fatal() {
+ // FIXME aconway 2010-10-20: error handling
+ assert(0);
+ broker::SignalHandler::shutdown();
+}
+
+void Core::mcast(const framing::AMQBody& body) {
+ QPID_LOG(trace, "cluster multicast: " << body);
+ // FIXME aconway 2010-10-20: use Multicaster, or bring in its features.
+ // here we multicast Frames rather than Events.
+ framing::AMQFrame f(body);
+ std::string data(f.encodedSize(), char());
+ framing::Buffer buf(&data[0], data.size());
+ f.encode(buf);
+ iovec iov = { buf.getPointer(), buf.getSize() };
+ while (!eventHandler->getCpg().mcast(&iov, 1))
+ ::usleep(1000); // FIXME aconway 2010-10-20: flow control
+}
+
+}} // namespace qpid::cluster
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.h?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.h Tue Feb 22 18:23:06 2011
@@ -0,0 +1,93 @@
+#ifndef QPID_CLUSTER_CORE_H
+#define QPID_CLUSTER_CORE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <memory>
+#include "LockedMap.h"
+#include "qpid/cluster/types.h"
+#include "qpid/cluster/Cpg.h"
+#include "qpid/broker/QueuedMessage.h"
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+namespace qpid {
+
+namespace framing{
+class AMQBody;
+}
+
+namespace broker {
+class Broker;
+}
+
+namespace cluster {
+class EventHandler;
+class BrokerHandler;
+
+/**
+ * Cluster core state machine.
+ * Holds together the various objects that implement cluster behavior,
+ * and holds state that is shared by multiple components.
+ *
+ * Thread safe: called from broker connection threads and CPG dispatch threads.
+ */
+class Core
+{
+ public:
+ /** Configuration settings */
+ struct Settings {
+ std::string name;
+ };
+
+ typedef LockedMap<RoutingId, boost::intrusive_ptr<broker::Message> > RoutingMap;
+
+ /** Constructed during Plugin::earlyInitialize() */
+ Core(const Settings&, broker::Broker&);
+
+ /** Called during Plugin::initialize() */
+ void initialize();
+
+ /** Shut down broker due to fatal error. Caller should log a critical message */
+ void fatal();
+
+ /** Multicast an event */
+ void mcast(const framing::AMQBody&);
+
+ broker::Broker& getBroker() { return broker; }
+ EventHandler& getEventHandler() { return *eventHandler; }
+ BrokerHandler& getBrokerHandler() { return *brokerHandler; }
+
+ /** Map of messages that are currently being routed.
+ * Used to pass messages being routed from BrokerHandler to MessageHandler
+ */
+ RoutingMap& getRoutingMap() { return routingMap; }
+ private:
+ broker::Broker& broker;
+ std::auto_ptr<EventHandler> eventHandler; // Handles CPG events.
+ BrokerHandler* brokerHandler; // Handles broker events.
+ RoutingMap routingMap;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CORE_H*/
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp Tue Feb 22 18:23:06 2011
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "EventHandler.h"
+#include "HandlerBase.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/cluster/types.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+EventHandler::EventHandler(Core& c) :
+ core(c),
+ cpg(*this), // FIXME aconway 2010-10-20: belongs on Core.
+ dispatcher(cpg, core.getBroker().getPoller(), boost::bind(&Core::fatal, &core)),
+ self(cpg.self())
+{}
+
+EventHandler::~EventHandler() {}
+
+void EventHandler::add(const boost::shared_ptr<HandlerBase>& handler) {
+ handlers.push_back(handler);
+}
+
+void EventHandler::start() {
+ dispatcher.start();
+}
+
+// Print member ID or "self" if member is self
+struct PrettyId {
+ MemberId id, self;
+ PrettyId(const MemberId& id_, const MemberId& self_) : id(id_), self(self_) {}
+};
+
+std::ostream& operator<<(std::ostream& o, const PrettyId& id) {
+ if (id.id == id.self) return o << "self";
+ else return o << id.id;
+}
+
+// Deliver CPG message.
+void EventHandler::deliver(
+ cpg_handle_t /*handle*/,
+ const cpg_name* /*group*/,
+ uint32_t nodeid,
+ uint32_t pid,
+ void* msg,
+ int msg_len)
+{
+ sender = MemberId(nodeid, pid);
+ framing::Buffer buf(static_cast<char*>(msg), msg_len);
+ framing::AMQFrame frame;
+ while (buf.available()) {
+ frame.decode(buf);
+ assert(frame.getBody());
+ QPID_LOG(trace, "cluster deliver: " << PrettyId(sender, self) << " "
+ << *frame.getBody());
+ try {
+ invoke(*frame.getBody());
+ } catch (const std::exception& e) {
+ // Note: exceptions are assumed to be survivable,
+ // fatal errors should log a message and call Core::fatal.
+ QPID_LOG(error, e.what());
+ }
+ }
+}
+
+void EventHandler::invoke(const framing::AMQBody& body) {
+ for (Handlers::iterator i = handlers.begin(); i != handlers.end(); ++i)
+ if ((*i)->invoke(body)) return;
+ QPID_LOG(error, "Cluster received unknown control: " << body );
+ assert(0); // Error handling
+}
+
+struct PrintAddrs {
+ PrintAddrs(const cpg_address* a, int n ) : addrs(a), count(n) {}
+ const cpg_address* addrs;
+ int count;
+};
+
+std::ostream& operator<<(std::ostream& o, const PrintAddrs& pa) {
+ for (const cpg_address* a = pa.addrs; a != pa.addrs+pa.count; ++a)
+ o << MemberId(*a) << " ";
+ return o;
+}
+
+// CPG config-change callback.
+void EventHandler::configChange (
+ cpg_handle_t /*handle*/,
+ const cpg_name */*group*/,
+ const cpg_address *members, int nMembers,
+ const cpg_address *left, int nLeft,
+ const cpg_address *joined, int nJoined)
+{
+ // FIXME aconway 2010-10-20: TODO
+ QPID_LOG(notice, "cluster: new membership: " << PrintAddrs(members, nMembers));
+ QPID_LOG_IF(notice, nLeft, "cluster: members left: " << PrintAddrs(left, nLeft));
+ QPID_LOG_IF(notice, nJoined, "cluster: members joined: " << PrintAddrs(joined, nJoined));
+}
+
+}} // namespace qpid::cluster
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.h?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.h Tue Feb 22 18:23:06 2011
@@ -0,0 +1,95 @@
+#ifndef QPID_CLUSTER_EVENTHANDLER_H
+#define QPID_CLUSTER_EVENTHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+#include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/PollerDispatch.h"
+#include "qpid/cluster/types.h"
+#include <boost/shared_ptr.hpp>
+#include <vector>
+
+namespace qpid {
+
+namespace framing {
+class AMQBody;
+}
+
+namespace cluster {
+class Core;
+class HandlerBase;
+
+/**
+ * Dispatch events received from a CPG group.
+ * A container for Handler objects that handle specific cluster.xml classes.
+ * Thread unsafe: only called in its own CPG deliver thread context.
+ */
+class EventHandler : public Cpg::Handler
+{
+ public:
+ EventHandler(Core&);
+ ~EventHandler();
+
+ /** Add a handler */
+ void add(const boost::shared_ptr<HandlerBase>&);
+
+ /** Start polling */
+ void start();
+
+ void deliver( // CPG deliver callback.
+ cpg_handle_t /*handle*/,
+ const struct cpg_name *group,
+ uint32_t /*nodeid*/,
+ uint32_t /*pid*/,
+ void* /*msg*/,
+ int /*msg_len*/);
+
+ void configChange( // CPG config change callback.
+ cpg_handle_t /*handle*/,
+ const struct cpg_name */*group*/,
+ const struct cpg_address */*members*/, int /*nMembers*/,
+ const struct cpg_address */*left*/, int /*nLeft*/,
+ const struct cpg_address */*joined*/, int /*nJoined*/
+ );
+
+ MemberId getSender() { return sender; }
+ MemberId getSelf() { return self; }
+ Core& getCore() { return core; }
+ Cpg& getCpg() { return cpg; }
+
+ private:
+ void invoke(const framing::AMQBody& body);
+
+ Core& core;
+ Cpg cpg;
+ PollerDispatch dispatcher;
+ MemberId sender; // sender of current event.
+ MemberId self;
+
+ typedef std::vector<boost::shared_ptr<HandlerBase> > Handlers;
+ Handlers handlers;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EVENTHANDLER_H*/
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp (from r1073372, qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h)
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp?p2=qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp&p1=qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h&r1=1073372&r2=1073448&rev=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp Tue Feb 22 18:23:06 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,31 +18,19 @@
* under the License.
*
*/
-#ifndef _QueuedMessage_
-#define _QueuedMessage_
-
-#include "qpid/broker/Message.h"
+#include "HandlerBase.h"
+#include "EventHandler.h"
namespace qpid {
-namespace broker {
+namespace cluster {
-class Queue;
+HandlerBase::HandlerBase(EventHandler& eh) : eventHandler(eh) {}
-struct QueuedMessage
-{
- boost::intrusive_ptr<Message> payload;
- framing::SequenceNumber position;
- Queue* queue;
+HandlerBase::~HandlerBase() {}
- QueuedMessage() : queue(0) {}
- QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
- payload(msg), position(sn), queue(q) {}
- QueuedMessage(Queue* q) : queue(q) {}
-
-};
- inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; }
+MemberId HandlerBase::sender() { return eventHandler.getSender(); }
-}}
+MemberId HandlerBase::self() { return eventHandler.getSelf(); }
-#endif
+}} // namespace qpid::cluster
Copied: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h (from r1073372, qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h)
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h?p2=qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h&p1=qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h&r1=1073372&r2=1073448&rev=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h Tue Feb 22 18:23:06 2011
@@ -1,3 +1,6 @@
+#ifndef QPID_CLUSTER_HANDLERBASE_H
+#define QPID_CLUSTER_HANDLERBASE_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -7,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,31 +21,34 @@
* under the License.
*
*/
-#ifndef _QueuedMessage_
-#define _QueuedMessage_
-
-#include "qpid/broker/Message.h"
+#include "qpid/cluster/types.h"
namespace qpid {
-namespace broker {
-class Queue;
+namespace framing {
+class AMQBody;
+}
-struct QueuedMessage
+namespace cluster {
+class EventHandler;
+
+/**
+ * Base class for handlers of events, children of the EventHandler.
+ */
+class HandlerBase
{
- boost::intrusive_ptr<Message> payload;
- framing::SequenceNumber position;
- Queue* queue;
-
- QueuedMessage() : queue(0) {}
- QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
- payload(msg), position(sn), queue(q) {}
- QueuedMessage(Queue* q) : queue(q) {}
-
+ public:
+ HandlerBase(EventHandler&);
+ virtual ~HandlerBase();
+
+ virtual bool invoke(const framing::AMQBody& body) = 0;
+
+ protected:
+ EventHandler& eventHandler;
+ MemberId sender();
+ MemberId self();
};
- inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; }
-
-}}
+}} // namespace qpid::cluster
-#endif
+#endif /*!QPID_CLUSTER_HANDLERBASE_H*/
Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/LockedMap.h?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/LockedMap.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/LockedMap.h Tue Feb 22 18:23:06 2011
@@ -0,0 +1,73 @@
+#ifndef QPID_CLUSTER_LOCKEDMAP_H
+#define QPID_CLUSTER_LOCKEDMAP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A reader-writer locked thread safe map.
+ */
+template <class Key, class Value>
+class LockedMap
+{
+ public:
+ /** Get value associated with key, returns Value() if none. */
+ Value get(const Key& key) const {
+ sys::RWlock::ScopedRlock r(lock);
+ typename Map::const_iterator i = map.find(key);
+ if (i == map.end()) return Value();
+ else return i->second;
+ }
+
+ /** Associate value with key, overwriting any previous value for key. */
+ void put(const Key& key, const Value& value) {
+ sys::RWlock::ScopedWlock w(lock);
+ map[key] = value;
+ }
+
+ /** Associate value with key if there is not already a value associated with key.
+ * Returns true if the value was added.
+ */
+ bool add(const Key& key, const Value& value) {
+ sys::RWlock::ScopedWlock w(lock);
+ return map.insert(key, value).second;
+ }
+
+ /** Erase the value associated with key if any. Return true if a value was erased. */
+ bool erase(const Key& key) {
+ sys::RWlock::ScopedWlock w(lock);
+ return map.erase(key);
+ }
+
+ private:
+ typedef std::map<Key, Value> Map;
+ Map map;
+ mutable sys::RWlock lock;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_LOCKEDMAP_H*/
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org