You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/02/22 19:23:07 UTC

svn commit: r1073448 [1/2] - in /qpid/branches/qpid-2920/qpid/cpp: design_docs/ src/ src/qpid/broker/ src/qpid/cluster/ src/qpid/cluster/exp/ src/tests/ xml/

Author: aconway
Date: Tue Feb 22 18:23:06 2011
New Revision: 1073448

URL: http://svn.apache.org/viewvc?rev=1073448&view=rev
Log:
QPID-2920: First cut experimental prototype for new cluster.

Experimental code to investigate & measure performance of new cluster design ideas.
Experimental classes are in src/qpid/cluster/exp.

New broker::Cluster interface provides call points for cluster. Similar to
store but has more operations, may be merged at a future point.

Added:
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Cluster.h   (with props)
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/NullCluster.h   (with props)
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp   (with props)
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h   (with props)
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp   (with props)
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.cpp   (with props)
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.h   (with props)
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp   (with props)
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.h   (with props)
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp
      - copied, changed from r1073372, qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h
      - copied, changed from r1073372, qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/LockedMap.h   (with props)
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp   (with props)
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h   (with props)
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt   (with props)
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp   (with props)
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h   (with props)
    qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp   (with props)
    qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py   (with props)
Modified:
    qpid/branches/qpid-2920/qpid/cpp/design_docs/new-cluster-plan.txt
    qpid/branches/qpid-2920/qpid/cpp/src/Makefile.am
    qpid/branches/qpid-2920/qpid/cpp/src/cluster.mk
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.h
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoverableExchange.h
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.h
    qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/types.h
    qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am
    qpid/branches/qpid-2920/qpid/cpp/src/tests/brokertest.py
    qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster.mk
    qpid/branches/qpid-2920/qpid/cpp/src/tests/run_cluster_tests
    qpid/branches/qpid-2920/qpid/cpp/src/tests/test_env.sh.in
    qpid/branches/qpid-2920/qpid/cpp/xml/cluster.xml

Modified: qpid/branches/qpid-2920/qpid/cpp/design_docs/new-cluster-plan.txt
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/design_docs/new-cluster-plan.txt?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/design_docs/new-cluster-plan.txt (original)
+++ qpid/branches/qpid-2920/qpid/cpp/design_docs/new-cluster-plan.txt Tue Feb 22 18:23:06 2011
@@ -146,10 +146,6 @@ reject(QueuedMessage):
   isRejecting = true
   mcast reject(qmsg)
 
-# FIXME no longer needed?
-drop(QueuedMessage)
-  cleanup(qmsg)
-
 *** MessageHandler and mcast messages
 Types:
 - struct QueueEntry { QueuedMessage qmsg; NodeId acquired; }
@@ -348,6 +344,9 @@ For 0-10 can use channel numbers & send 
 Extend broker::Cluster interface to capture transaction context and completion.
 Sequence number to generate per-node tx IDs.
 Replicate transaction completion.
+** TODO [#B] Management support
+- Replicate management methods that modify queues - e.g. move, purge.
+- Report connections - local only or cluster-wide?
 ** TODO [#B] Batch CPG multicast messages
 The new cluster design involves a lot of small multicast messages,
 they need to be batched into larger CPG messages for efficiency.

Modified: qpid/branches/qpid-2920/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/Makefile.am?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/Makefile.am Tue Feb 22 18:23:06 2011
@@ -517,6 +517,7 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/Broker.cpp \
   qpid/broker/Broker.h \
   qpid/broker/BrokerImportExport.h \
+  qpid/broker/Cluster.h \
   qpid/broker/Connection.cpp \
   qpid/broker/Connection.h \
   qpid/broker/ConnectionFactory.cpp \
@@ -586,6 +587,7 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/PriorityQueue.cpp \
   qpid/broker/NameGenerator.cpp \
   qpid/broker/NameGenerator.h \
+  qpid/broker/NullCluster.h \
   qpid/broker/NullMessageStore.cpp \
   qpid/broker/NullMessageStore.h \
   qpid/broker/OwnershipToken.h \

Modified: qpid/branches/qpid-2920/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/cluster.mk?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/cluster.mk (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/cluster.mk Tue Feb 22 18:23:06 2011
@@ -35,7 +35,6 @@ endif
 if HAVE_LIBCPG
 
 dmodule_LTLIBRARIES += cluster.la
-
 cluster_la_SOURCES =				\
   $(CMAN_SOURCES)				\
   qpid/cluster/Cluster.cpp			\
@@ -101,6 +100,30 @@ cluster_la_LIBADD=  -lcpg $(libcman) lib
 cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing
 cluster_la_LDFLAGS = $(PLUGINLDFLAGS)
 
+# Experimental new cluster plugin
+dmodule_LTLIBRARIES += cluster2.la
+cluster2_la_LIBADD = -lcpg libqpidbroker.la
+cluster2_la_LDFLAGS = $(PLUGINLDFLAGS)
+cluster2_la_SOURCES =				\
+	qpid/cluster/Cpg.cpp			\
+	qpid/cluster/Cpg.h			\
+	qpid/cluster/PollerDispatch.cpp		\
+	qpid/cluster/PollerDispatch.h		\
+	qpid/cluster/exp/BrokerHandler.cpp	\
+	qpid/cluster/exp/BrokerHandler.h	\
+	qpid/cluster/exp/Cluster2Plugin.cpp	\
+	qpid/cluster/exp/Core.cpp		\
+	qpid/cluster/exp/Core.h			\
+	qpid/cluster/exp/EventHandler.cpp	\
+	qpid/cluster/exp/EventHandler.h		\
+	qpid/cluster/exp/HandlerBase.cpp	\
+	qpid/cluster/exp/HandlerBase.h		\
+	qpid/cluster/exp/MessageHandler.cpp	\
+	qpid/cluster/exp/MessageHandler.h	\
+	qpid/cluster/exp/WiringHandler.cpp	\
+	qpid/cluster/exp/WiringHandler.h
+
+
 # The watchdog plugin and helper executable
 dmodule_LTLIBRARIES += watchdog.la
 watchdog_la_SOURCES = qpid/cluster/WatchDogPlugin.cpp

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.cpp Tue Feb 22 18:23:06 2011
@@ -25,6 +25,7 @@
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/HeadersExchange.h"
 #include "qpid/broker/MessageStoreModule.h"
+#include "qpid/broker/NullCluster.h"
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/RecoveryManagerImpl.h"
 #include "qpid/broker/SaslAuthenticator.h"
@@ -174,6 +175,7 @@ Broker::Broker(const Broker::Options& co
                                                           conf.qmf2Support)
                                     : 0),
     store(new NullMessageStore),
+    cluster(new NullCluster),
     acl(0),
     dataDir(conf.noDataDir ? std::string() : conf.dataDir),
     queues(this),
@@ -758,7 +760,6 @@ void Broker::setClusterTimer(std::auto_p
 
 const std::string Broker::TCP_TRANSPORT("tcp");
 
-
 std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
     const std::string& name,
     bool durable,
@@ -819,10 +820,11 @@ std::pair<boost::shared_ptr<Queue>, bool
 void Broker::deleteQueue(const std::string& name, const std::string& userId,
                          const std::string& connectionId, QueueFunctor check)
 {
-    if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
-        throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId));
+    if ((userId.size() || connectionId.size()) && // Skip ACL check if ID is empty.
+        acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) {
+            throw framing::UnauthorizedAccessException(
+                QPID_MSG("ACL denied queue delete request from " << userId));
     }
-
     Queue::shared_ptr queue = queues.find(name);
     if (queue) {
         if (check) check(queue);
@@ -886,6 +888,7 @@ std::pair<Exchange::shared_ptr, bool> Br
                                                          ManagementAgent::toMap(arguments),
                                                          "created"));
         }
+        getCluster().create(*result.first);
     }
     return result;
 }
@@ -893,8 +896,8 @@ std::pair<Exchange::shared_ptr, bool> Br
 void Broker::deleteExchange(const std::string& name, const std::string& userId,
                            const std::string& connectionId)
 {
-    if (acl) {
-        if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) )
+    if ((userId.size() || connectionId.size()) && // Skip ACL check if ID is empty.
+        acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL)) {
             throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId));
     }
 
@@ -907,7 +910,7 @@ void Broker::deleteExchange(const std::s
 
     if (managementAgent.get())
         managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name));
-
+    getCluster().destroy(*exchange);
 }
 
 void Broker::bind(const std::string& queueName,
@@ -939,6 +942,7 @@ void Broker::bind(const std::string& que
                                                   queueName, key, ManagementAgent::toMap(arguments)));
             }
         }
+        getCluster().bind(*queue, *exchange, key, arguments);
     }
 }
 
@@ -955,7 +959,6 @@ void Broker::unbind(const std::string& q
         if (!acl->authorise(userId,acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,&params) )
             throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << userId));
     }
-
     Queue::shared_ptr queue = queues.find(queueName);
     Exchange::shared_ptr exchange = exchanges.get(exchangeName);
     if (!queue) {
@@ -965,14 +968,19 @@ void Broker::unbind(const std::string& q
     } else {
         if (exchange->unbind(queue, key, 0)) {
             if (exchange->isDurable() && queue->isDurable()) {
-                store->unbind(*exchange, *queue, key, qpid::framing::FieldTable());
+                store->unbind(*exchange, *queue, key, framing::FieldTable());
             }
             if (managementAgent.get()) {
                 managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key));
             }
+            getCluster().unbind(*queue, *exchange, key, framing::FieldTable());
         }
     }
 }
 
+void Broker::setCluster(std::auto_ptr<Cluster> c) { cluster = c; }
+
+Cluster& Broker::getCluster() { return *cluster; }
+
 }} // namespace qpid::broker
 

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.h?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Broker.h Tue Feb 22 18:23:06 2011
@@ -72,6 +72,7 @@ namespace broker {
 class ConnectionState;
 class ExpiryPolicy;
 class Message;
+class Cluster;
 
 static const  uint16_t DEFAULT_PORT=5672;
 
@@ -164,6 +165,7 @@ public:
     std::auto_ptr<management::ManagementAgent> managementAgent;
     ProtocolFactoryMap protocolFactories;
     std::auto_ptr<MessageStore> store;
+    std::auto_ptr<Cluster> cluster;
     AclModule* acl;
     DataDir dataDir;
 
@@ -293,6 +295,9 @@ public:
     bool isClusterUpdatee() const { return clusterUpdatee; }
     void setClusterUpdatee(bool set) { clusterUpdatee = set; }
 
+    QPID_BROKER_EXTERN void setCluster(std::auto_ptr<Cluster> c);
+    QPID_BROKER_EXTERN Cluster& getCluster();
+
     management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
 
     ConnectionCounter& getConnectionCounter() {return connectionCounter;}

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Cluster.h?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Cluster.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Cluster.h Tue Feb 22 18:23:06 2011
@@ -0,0 +1,104 @@
+#ifndef QPID_BROKER_CLUSTER_H
+#define QPID_BROKER_CLUSTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+namespace framing {
+class FieldTable;
+}
+
+namespace broker {
+
+class Message;
+struct QueuedMessage;
+class Queue;
+class Exchange;
+
+/**
+ * NOTE: this is part of an experimental cluster implementation that is not
+ * yet fully functional. The original cluster implementation remains in place.
+ * See ../cluster/new-cluster-design.txt
+ *
+ * Interface for cluster implementations. Functions on this interface are
+ * called at relevant points in the Broker's processing.
+ */
+class Cluster
+{
+  public:
+    virtual ~Cluster() {}
+
+    // Messages
+
+    /** In Exchange::route, before the message is enqueued. */
+    virtual void routing(const boost::intrusive_ptr<Message>&) = 0;
+
+    /** A message is delivered to a queue.
+     * Called before actually pushing the message to the queue.
+     *@return If true the message should be pushed to the queue now.
+     * otherwise the cluster code will push the message when it is replicated.
+     */
+    virtual bool enqueue(Queue& queue, const boost::intrusive_ptr<Message>&) = 0;
+
+    /** In Exchange::route, after all enqueues for the message. */
+    virtual void routed(const boost::intrusive_ptr<Message>&) = 0;
+
+    /** A message is acquired by a local consumer, it is unavailable to replicas. */
+    virtual void acquire(const QueuedMessage&) = 0;
+
+    /** A locally-acquired message is released by the consumer and re-queued. */
+    virtual void release(const QueuedMessage&) = 0;
+
+    /** A message is removed from the queue. */
+    virtual void dequeue(const QueuedMessage&) = 0;
+
+    // Consumers
+
+    /** A new consumer subscribes to a queue. */
+    virtual void consume(const Queue&, size_t consumerCount) = 0;
+    /** A consumer cancels its subscription to a queue */
+    virtual void cancel(const Queue&, size_t consumerCount) = 0;
+
+    // Wiring
+
+    /** A queue is created */
+    virtual void create(const Queue&) = 0;
+    /** A queue is destroyed */
+    virtual void destroy(const Queue&) = 0;
+    /** An exchange is created */
+    virtual void create(const Exchange&) = 0;
+    /** An exchange is destroyed */
+    virtual void destroy(const Exchange&) = 0;
+    /** A binding is created */
+    virtual void bind(const Queue&, const Exchange&,
+                      const std::string& key, const framing::FieldTable& args) = 0;
+    /** A binding is removed */
+    virtual void unbind(const Queue&, const Exchange&,
+                        const std::string& key, const framing::FieldTable& args) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_CLUSTER_H*/

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Cluster.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Cluster.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Exchange.cpp Tue Feb 22 18:23:06 2011
@@ -23,6 +23,7 @@
 #include "qpid/broker/ExchangeRegistry.h"
 #include "qpid/broker/FedOps.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/Cluster.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/log/Statement.h"
@@ -70,10 +71,23 @@ Exchange::PreRoute::~PreRoute(){
     }
 }
 
+// Bracket a scope with calls to Cluster::routing and Cluster::routed
+struct ScopedClusterRouting {
+    Broker* broker;
+    boost::intrusive_ptr<Message> message;
+    ScopedClusterRouting(Broker* b, boost::intrusive_ptr<Message> m)
+        : broker(b), message(m) {
+        if (broker) broker->getCluster().routing(message);
+    }
+    ~ScopedClusterRouting() {
+        if (broker) broker->getCluster().routed(message);
+    }
+};
+
 void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
 {
+    ScopedClusterRouting scr(broker, &msg.getMessage());
     int count = 0;
-
     if (b.get()) {
         // Block the content release if the message is transient AND there is more than one binding
         if (!msg.getMessage().isPersistent() && b->size() > 1) {

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Tue Feb 22 18:23:06 2011
@@ -24,6 +24,9 @@
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/HeadersExchange.h"
 #include "qpid/broker/TopicExchange.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Cluster.h"
+#include "qpid/log/Statement.h"
 #include "qpid/management/ManagementDirectExchange.h"
 #include "qpid/management/ManagementTopicExchange.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -87,11 +90,15 @@ void ExchangeRegistry::destroy(const str
 }
 
 Exchange::shared_ptr ExchangeRegistry::get(const string& name){
+    Exchange::shared_ptr ex = find(name);
+    if (!ex) throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name));
+    return ex;
+}
+
+Exchange::shared_ptr ExchangeRegistry::find(const string& name){
     RWlock::ScopedRlock locker(lock);
     ExchangeMap::iterator i =  exchanges.find(name);
-    if (i == exchanges.end())
-        throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name));
-    return i->second;
+    return (i == exchanges.end()) ? Exchange::shared_ptr() : i->second;
 }
 
 bool ExchangeRegistry::registerExchange(const Exchange::shared_ptr& ex) {

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Tue Feb 22 18:23:06 2011
@@ -55,6 +55,7 @@ class ExchangeRegistry{
        const qpid::framing::FieldTable& args = framing::FieldTable());
     QPID_BROKER_EXTERN void destroy(const std::string& name);
     QPID_BROKER_EXTERN Exchange::shared_ptr get(const std::string& name);
+    QPID_BROKER_EXTERN Exchange::shared_ptr find(const std::string& name);
     Exchange::shared_ptr getDefault();
 
     /**

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/NullCluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/NullCluster.h?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/NullCluster.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/NullCluster.h Tue Feb 22 18:23:06 2011
@@ -0,0 +1,66 @@
+#ifndef QPID_BROKER_NULLCLUSTER_H
+#define QPID_BROKER_NULLCLUSTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/broker/Cluster.h>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * No-op implementation of Cluster interface, installed by broker when
+ * no cluster plug-in is present or clustering is disabled.
+ */
+class NullCluster : public Cluster
+{
+  public:
+
+    // Messages
+
+    virtual void routing(const boost::intrusive_ptr<Message>&) {}
+    virtual bool enqueue(Queue&, const boost::intrusive_ptr<Message>&) { return true; }
+    virtual void routed(const boost::intrusive_ptr<Message>&) {}
+    virtual void acquire(const QueuedMessage&) {}
+    virtual void release(const QueuedMessage&) {}
+    virtual void dequeue(const QueuedMessage&) {}
+
+    // Consumers
+
+    virtual void consume(const Queue&, size_t) {}
+    virtual void cancel(const Queue&, size_t) {}
+
+    // Wiring
+
+    virtual void create(const Queue&) {}
+    virtual void destroy(const Queue&) {}
+    virtual void create(const Exchange&) {}
+    virtual void destroy(const Exchange&) {}
+    virtual void bind(const Queue&, const Exchange&,
+                      const std::string&, const framing::FieldTable&) {}
+    virtual void unbind(const Queue&, const Exchange&,
+                        const std::string&, const framing::FieldTable&) {}
+};
+
+}} // namespace qpid::broker
+
+#endif

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/NullCluster.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/NullCluster.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/Queue.cpp Tue Feb 22 18:23:06 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,7 @@
  */
 
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/Cluster.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueEvents.h"
 #include "qpid/broker/Exchange.h"
@@ -65,7 +66,7 @@ using std::mem_fun;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
 
-namespace 
+namespace
 {
 const std::string qpidMaxSize("qpid.max_size");
 const std::string qpidMaxCount("qpid.max_count");
@@ -87,16 +88,16 @@ const int ENQUEUE_ONLY=1;
 const int ENQUEUE_AND_DEQUEUE=2;
 }
 
-Queue::Queue(const string& _name, bool _autodelete, 
+Queue::Queue(const string& _name, bool _autodelete,
              MessageStore* const _store,
              const OwnershipToken* const _owner,
              Manageable* parent,
              Broker* b) :
 
-    name(_name), 
+    name(_name),
     autodelete(_autodelete),
     store(_store),
-    owner(_owner), 
+    owner(_owner),
     consumerCount(0),
     exclusive(0),
     noLocal(false),
@@ -152,6 +153,10 @@ void Queue::deliver(boost::intrusive_ptr
     // Check for deferred delivery in a cluster.
     if (broker && broker->deferDelivery(name, msg))
         return;
+    // Same thing but for the new cluster interface.
+    if (broker && !broker->getCluster().enqueue(*this, msg))
+        return;
+
     if (msg->isImmediate() && getConsumerCount() == 0) {
         if (alternateExchange) {
             DeliverableMessage deliverable(msg);
@@ -179,9 +184,9 @@ void Queue::recover(boost::intrusive_ptr
     if (policy.get()) policy->recoverEnqueued(msg);
 
     push(msg, true);
-    if (store){ 
+    if (store){
         // setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
-        msg->addToSyncList(shared_from_this(), store); 
+        msg->addToSyncList(shared_from_this(), store);
     }
     msg->enqueueComplete(); // mark the message as enqueued
 
@@ -207,14 +212,14 @@ void Queue::process(boost::intrusive_ptr
 void Queue::requeue(const QueuedMessage& msg){
     assertClusterSafe();
     QueueListeners::NotificationSet copy;
-    {    
+    {
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return;
         msg.payload->enqueueComplete(); // mark the message as enqueued
         messages->reinsert(msg);
         listeners.populate(copy);
 
-        // for persistLastNode - don't force a message twice to disk, but force it if no force before 
+        // for persistLastNode - don't force a message twice to disk, but force it if no force before
         if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
             msg.payload->forcePersistent();
             if (msg.payload->isForcedPersistent() ){
@@ -223,16 +228,33 @@ void Queue::requeue(const QueuedMessage&
             }
         }
     }
+    if (broker) broker->getCluster().release(msg);
     copy.notify();
 }
 
-bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) 
+// Inform the cluster of an acquired message on exit from a function
+// that does the acquiring. ClusterAcquireOnExit is declared *before*
+// any locks are taken. The calling function sets qmsg to the acquired
+// message with a lock held, but the call to Cluster::acquire() will
+// be outside the lock.
+struct ClusterAcquireOnExit {
+    Broker* broker;
+    QueuedMessage qmsg;
+    ClusterAcquireOnExit(Broker* b) : broker(b) {}
+    ~ClusterAcquireOnExit() {
+        if (broker && qmsg.queue) broker->getCluster().acquire(qmsg);
+    }
+};
+
+bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
 {
+    ClusterAcquireOnExit willAcquire(broker); // Outside lock
     Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
     QPID_LOG(debug, "Attempting to acquire message at " << position);
     if (messages->remove(position, message)) {
         QPID_LOG(debug, "Acquired message at " << position << " from " << name);
+        willAcquire.qmsg = message;
         return true;
     } else {
         QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
@@ -270,7 +292,7 @@ bool Queue::getNextMessage(QueuedMessage
           case NO_MESSAGES:
           default:
             return false;
-        }        
+        }
     } else {
         return browseNextMessage(m, c);
     }
@@ -279,8 +301,9 @@ bool Queue::getNextMessage(QueuedMessage
 Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
 {
     while (true) {
+        ClusterAcquireOnExit willAcquire(broker); // Outside the lock
         Mutex::ScopedLock locker(messageLock);
-        if (messages->empty()) { 
+        if (messages->empty()) {
             QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
             listeners.addListener(c);
             return NO_MESSAGES;
@@ -293,8 +316,9 @@ Queue::ConsumeCode Queue::consumeNextMes
             }
 
             if (c->filter(msg.payload)) {
-                if (c->accept(msg.payload)) {            
+                if (c->accept(msg.payload)) {
                     m = msg;
+                    willAcquire.qmsg = msg;
                     pop();
                     return CONSUMED;
                 } else {
@@ -306,7 +330,7 @@ Queue::ConsumeCode Queue::consumeNextMes
                 //consumer will never want this message
                 QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
                 return CANT_CONSUME;
-            } 
+            }
         }
     }
 }
@@ -360,7 +384,7 @@ bool Queue::dispatch(Consumer::shared_pt
     }
 }
 
-// Find the next message 
+// Find the next message
 bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
     Mutex::ScopedLock locker(messageLock);
     if (messages->next(c->position, msg)) {
@@ -379,42 +403,51 @@ QueuedMessage Queue::find(SequenceNumber
     return msg;
 }
 
-void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) {
     assertClusterSafe();
-    Mutex::ScopedLock locker(consumerLock);
-    if(exclusive) {
-        throw ResourceLockedException(
-            QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
-    } else if(requestExclusive) {
-        if(consumerCount) {
+    size_t consumers;
+    {
+        Mutex::ScopedLock locker(consumerLock);
+        if(exclusive) {
             throw ResourceLockedException(
-                QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
-        } else {
-            exclusive = c->getSession();
+                QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+        } else if(requestExclusive) {
+            if(consumerCount) {
+                throw ResourceLockedException(
+                    QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+            } else {
+                exclusive = c->getSession();
+            }
         }
+        consumers = ++consumerCount;
     }
-    consumerCount++;
     if (mgmtObject != 0)
         mgmtObject->inc_consumerCount ();
     //reset auto deletion timer if necessary
     if (autoDeleteTimeout && autoDeleteTask) {
         autoDeleteTask->cancel();
     }
+    if (broker) broker->getCluster().consume(*this, consumers);
 }
 
 void Queue::cancel(Consumer::shared_ptr c){
     removeListener(c);
-    Mutex::ScopedLock locker(consumerLock);
-    consumerCount--;
-    if(exclusive) exclusive = 0;
-    if (mgmtObject != 0)
-        mgmtObject->dec_consumerCount ();
+    size_t consumers;
+    {
+        Mutex::ScopedLock locker(consumerLock);
+        consumers = --consumerCount;
+        if(exclusive) exclusive = 0;
+        if (mgmtObject != 0)
+            mgmtObject->dec_consumerCount ();
+    }
+    if (broker) broker->getCluster().cancel(*this, consumers);
 }
 
 QueuedMessage Queue::get(){
+    ClusterAcquireOnExit willAcquire(broker); // Outside lock
     Mutex::ScopedLock locker(messageLock);
     QueuedMessage msg(this);
-    messages->pop(msg);
+    if (messages->pop(msg)) willAcquire.qmsg = msg;
     return msg;
 }
 
@@ -432,7 +465,7 @@ void Queue::purgeExpired()
 {
     //As expired messages are discarded during dequeue also, only
     //bother explicitly expiring if the rate of dequeues since last
-    //attempt is less than one per second.  
+    //attempt is less than one per second.
 
     if (dequeueTracker.sampleRatePerSecond() < 1) {
         std::deque<QueuedMessage> expired;
@@ -459,7 +492,7 @@ void Queue::purgeExpired()
 uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest)
 {
     Mutex::ScopedLock locker(messageLock);
-    uint32_t purge_count = purge_request; // only comes into play if  >0 
+    uint32_t purge_count = purge_request; // only comes into play if  >0
     std::deque<DeliverableMessage> rerouteQueue;
 
     uint32_t count = 0;
@@ -493,7 +526,7 @@ uint32_t Queue::purge(const uint32_t pur
 
 uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
     Mutex::ScopedLock locker(messageLock);
-    uint32_t move_count = qty; // only comes into play if  qty >0 
+    uint32_t move_count = qty; // only comes into play if  qty >0
     uint32_t count = 0; // count how many were moved for returning
 
     while((!qty || move_count--) && !messages->empty()) {
@@ -516,14 +549,15 @@ void Queue::pop()
 
 void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
     assertClusterSafe();
+    QueuedMessage qm;
     QueueListeners::NotificationSet copy;
     QueuedMessage removed;
     bool dequeueRequired = false;
     {
-        Mutex::ScopedLock locker(messageLock);   
-        QueuedMessage qm(this, msg, ++sequence);
+        Mutex::ScopedLock locker(messageLock);
+        qm = QueuedMessage(this, msg, ++sequence);
         if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
-         
+
         dequeueRequired = messages->push(qm, removed);
         listeners.populate(copy);
         enqueued(qm);
@@ -602,7 +636,7 @@ void Queue::setLastNodeFailure()
 }
 
 
-// return true if store exists, 
+// return true if store exists,
 bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck)
 {
     ScopedUse u(barrier);
@@ -616,13 +650,13 @@ bool Queue::enqueue(TransactionContext* 
             policy->getPendingDequeues(dequeues);
         }
         //depending on policy, may have some dequeues that need to performed without holding the lock
-        for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));        
+        for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
     }
 
     if (inLastNodeFailure && persistLastNode){
         msg->forcePersistent();
     }
-       
+
     if (traceId.size()) {
         //copy on write: take deep copy of message before modifying it
         //as the frames may already be available for delivery on other
@@ -650,22 +684,20 @@ bool Queue::enqueue(TransactionContext* 
 void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
 {
     Mutex::ScopedLock locker(messageLock);
-    if (policy.get()) policy->enqueueAborted(msg);       
+    if (policy.get()) policy->enqueueAborted(msg);
 }
 
-// return true if store exists, 
+// return true if store exists,
 bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
 {
     ScopedUse u(barrier);
     if (!u.acquired) return false;
-
     {
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return false;
-        if (!ctxt) { 
-            dequeued(msg);
-        }
+        if (!ctxt) dequeued(msg);
     }
+    if (!ctxt && broker) broker->getCluster().dequeue(msg); // Outside lock
     // This check prevents messages which have been forced persistent on one queue from dequeuing
     // from another on which no forcing has taken place and thus causing a store error.
     bool fp = msg.payload->isForcedPersistent();
@@ -682,8 +714,9 @@ bool Queue::dequeue(TransactionContext* 
 
 void Queue::dequeueCommitted(const QueuedMessage& msg)
 {
+    if (broker) broker->getCluster().dequeue(msg); // Outside lock
     Mutex::ScopedLock locker(messageLock);
-    dequeued(msg);    
+    dequeued(msg);
     if (mgmtObject != 0) {
         mgmtObject->inc_msgTxnDequeues();
         mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
@@ -726,6 +759,7 @@ void Queue::create(const FieldTable& _se
         store->create(*this, _settings);
     }
     configure(_settings);
+    if (broker) broker->getCluster().create(*this);
 }
 
 
@@ -738,8 +772,8 @@ int getIntegerSetting(const qpid::framin
         return v->get<int>();
     } else if (v->convertsTo<std::string>()){
         std::string s = v->get<std::string>();
-        try { 
-            return boost::lexical_cast<int>(s); 
+        try {
+            return boost::lexical_cast<int>(s);
         } catch(const boost::bad_lexical_cast&) {
             QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
             return 0;
@@ -757,8 +791,7 @@ void Queue::configure(const FieldTable& 
     if (eventMode && broker) {
         broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
     }
-
-    if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK && 
+    if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
         (!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
         if ( NullMessageStore::isNullStore(store)) {
             QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
@@ -796,7 +829,7 @@ void Queue::configure(const FieldTable& 
             QPID_LOG(debug, "Configured queue " <<  getName() << " as priority queue.");
         }
     }
-    
+
     persistLastNode= _settings.get(qpidPersistLastNode);
     if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
 
@@ -805,15 +838,15 @@ void Queue::configure(const FieldTable& 
     if (excludeList.size()) {
         split(traceExclude, excludeList, ", ");
     }
-    QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId 
+    QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
              << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
 
     FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
     if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
 
     autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
-    if (autoDeleteTimeout) 
-        QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); 
+    if (autoDeleteTimeout)
+        QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
 
     if (mgmtObject != 0) {
         mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
@@ -847,6 +880,7 @@ void Queue::destroyed()
     }
     if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
     notifyDeleted();
+    if (broker) broker->getCluster().destroy(*this);
 }
 
 void Queue::notifyDeleted()
@@ -881,9 +915,9 @@ const QueuePolicy* Queue::getPolicy()
     return policy.get();
 }
 
-uint64_t Queue::getPersistenceId() const 
-{ 
-    return persistenceId; 
+uint64_t Queue::getPersistenceId() const
+{
+    return persistenceId;
 }
 
 void Queue::setPersistenceId(uint64_t _persistenceId) const
@@ -897,11 +931,11 @@ void Queue::setPersistenceId(uint64_t _p
     persistenceId = _persistenceId;
 }
 
-void Queue::encode(Buffer& buffer) const 
+void Queue::encode(Buffer& buffer) const
 {
     buffer.putShortString(name);
     buffer.put(settings);
-    if (policy.get()) { 
+    if (policy.get()) {
         buffer.put(*policy);
     }
     buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
@@ -953,7 +987,7 @@ boost::shared_ptr<Exchange> Queue::getAl
 
 void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
 {
-    if (broker.getQueues().destroyIf(queue->getName(), 
+    if (broker.getQueues().destroyIf(queue->getName(),
                                      boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
         QPID_LOG(debug, "Auto-deleting " << queue->getName());
         queue->destroyed();
@@ -965,7 +999,7 @@ struct AutoDeleteTask : qpid::sys::Timer
     Broker& broker;
     Queue::shared_ptr queue;
 
-    AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) 
+    AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
         : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
 
     void fire()
@@ -983,27 +1017,27 @@ void Queue::tryAutoDelete(Broker& broker
     if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
         AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
         queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
-        broker.getClusterTimer().add(queue->autoDeleteTask);        
+        broker.getClusterTimer().add(queue->autoDeleteTask);
         QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
     } else {
         tryAutoDeleteImpl(broker, queue);
     }
 }
 
-bool Queue::isExclusiveOwner(const OwnershipToken* const o) const 
-{ 
+bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
+{
     Mutex::ScopedLock locker(ownershipLock);
-    return o == owner; 
+    return o == owner;
 }
 
-void Queue::releaseExclusiveOwnership() 
-{ 
+void Queue::releaseExclusiveOwnership()
+{
     Mutex::ScopedLock locker(ownershipLock);
-    owner = 0; 
+    owner = 0;
 }
 
-bool Queue::setExclusiveOwner(const OwnershipToken* const o) 
-{ 
+bool Queue::setExclusiveOwner(const OwnershipToken* const o)
+{
     //reset auto deletion timer if necessary
     if (autoDeleteTimeout && autoDeleteTask) {
         autoDeleteTask->cancel();
@@ -1012,25 +1046,25 @@ bool Queue::setExclusiveOwner(const Owne
     if (owner) {
         return false;
     } else {
-        owner = o; 
+        owner = o;
         return true;
     }
 }
 
-bool Queue::hasExclusiveOwner() const 
-{ 
+bool Queue::hasExclusiveOwner() const
+{
     Mutex::ScopedLock locker(ownershipLock);
-    return owner != 0; 
+    return owner != 0;
 }
 
-bool Queue::hasExclusiveConsumer() const 
-{ 
-    return exclusive; 
+bool Queue::hasExclusiveConsumer() const
+{
+    return exclusive;
 }
 
 void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
-    if (externalQueueStore!=inst && externalQueueStore) 
-        delete externalQueueStore; 
+    if (externalQueueStore!=inst && externalQueueStore)
+        delete externalQueueStore;
     externalQueueStore = inst;
 
     if (inst) {

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h Tue Feb 22 18:23:06 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,13 +34,12 @@ struct QueuedMessage
     framing::SequenceNumber position;
     Queue* queue;
 
-    QueuedMessage() : queue(0) {}
-    QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) : 
+    QueuedMessage(Queue* q=0) : position(0), queue(q) {}
+    QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
         payload(msg), position(sn), queue(q) {}
-    QueuedMessage(Queue* q) : queue(q) {}
-    
+
 };
-    inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; } 
+    inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; }
 
 }}
 

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoverableExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoverableExchange.h?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoverableExchange.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoverableExchange.h Tue Feb 22 18:23:06 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,6 +44,8 @@ public:
                       const std::string& routingKey,
                       qpid::framing::FieldTable& args) = 0;
     virtual ~RecoverableExchange() {};
+
+    virtual const std::string& getName() const = 0;
 };
 
 }}

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Tue Feb 22 18:23:06 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -62,7 +62,7 @@ class RecoverableQueueImpl : public Reco
 public:
     RecoverableQueueImpl(const boost::shared_ptr<Queue>& _queue) : queue(_queue) {}
     ~RecoverableQueueImpl() {};
-    void setPersistenceId(uint64_t id);    
+    void setPersistenceId(uint64_t id);
 	uint64_t getPersistenceId() const;
     const std::string& getName() const;
     void setExternalQueueStore(ExternalQueueStore* inst);
@@ -80,6 +80,7 @@ public:
     RecoverableExchangeImpl(Exchange::shared_ptr _exchange, QueueRegistry& _queues) : exchange(_exchange), queues(_queues) {}
     void setPersistenceId(uint64_t id);
     void bind(const std::string& queue, const std::string& routingKey, qpid::framing::FieldTable& args);
+    const std::string& getName() const;
 };
 
 class RecoverableConfigImpl : public RecoverableConfig
@@ -133,7 +134,7 @@ RecoverableMessage::shared_ptr RecoveryM
     return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message));
 }
 
-RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, 
+RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
                                                                            std::auto_ptr<TPCTransactionContext> txn)
 {
     DtxBuffer::shared_ptr buffer(new DtxBuffer());
@@ -202,7 +203,7 @@ void RecoverableQueueImpl::setPersistenc
 {
     queue->setPersistenceId(id);
 }
-       
+
 uint64_t RecoverableQueueImpl::getPersistenceId() const
 {
 	return queue->getPersistenceId();
@@ -212,7 +213,7 @@ const std::string& RecoverableQueueImpl:
 {
     return queue->getName();
 }
-    
+
 void RecoverableQueueImpl::setExternalQueueStore(ExternalQueueStore* inst)
 {
     queue->setExternalQueueStore(inst);
@@ -245,6 +246,11 @@ void RecoverableExchangeImpl::bind(const
     queue->bound(exchange->getName(), key, args);
 }
 
+const std::string& RecoverableExchangeImpl::getName() const
+{
+    return exchange->getName();
+}
+
 void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
 {
     buffer->enlist(TxOp::shared_ptr(new RecoveredDequeue(queue, msg)));

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp Tue Feb 22 18:23:06 2011
@@ -37,9 +37,11 @@ PollerDispatch::PollerDispatch(Cpg& c, b
       started(false)
 {}
     
-PollerDispatch::~PollerDispatch() {
-    if (started)
-        dispatchHandle.stopWatch();
+PollerDispatch::~PollerDispatch() { stop(); }
+
+void PollerDispatch::stop() {
+    if (started) dispatchHandle.stopWatch();
+    started = false;
 }
 
 void PollerDispatch::start() {
@@ -54,6 +56,7 @@ void PollerDispatch::dispatch(sys::Dispa
         h.rewatch();
     } catch (const std::exception& e) {
         QPID_LOG(critical, "Error in cluster dispatch: " << e.what());
+        stop();
         onError();
     }
 }

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.h?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/PollerDispatch.h Tue Feb 22 18:23:06 2011
@@ -41,6 +41,7 @@ class PollerDispatch  {
     ~PollerDispatch();
 
     void start();
+    void stop();
 
   private:
     // Poller callbacks

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp Tue Feb 22 18:23:06 2011
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "BrokerHandler.h"
+#include "qpid/framing/ClusterMessageRoutingBody.h"
+#include "qpid/framing/ClusterMessageRoutedBody.h"
+#include "qpid/framing/ClusterMessageEnqueueBody.h"
+#include "qpid/framing/ClusterMessageDequeueBody.h"
+#include "qpid/framing/ClusterWiringCreateQueueBody.h"
+#include "qpid/framing/ClusterWiringCreateExchangeBody.h"
+#include "qpid/framing/ClusterWiringDestroyQueueBody.h"
+#include "qpid/framing/ClusterWiringDestroyExchangeBody.h"
+#include "qpid/framing/ClusterWiringBindBody.h"
+#include "qpid/framing/ClusterWiringUnbindBody.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+using namespace broker;
+
+namespace {
+// noReplicate means the current thread is handling a message
+// received from the cluster so it should not be replciated.
+QPID_TSS bool tssNoReplicate = false;
+
+// Routing ID of the message being routed in the current thread.
+// 0 if we are not currently routing a message.
+QPID_TSS RoutingId tssRoutingId = 0;
+}
+
+BrokerHandler::ScopedSuppressReplication::ScopedSuppressReplication() {
+    assert(!tssNoReplicate);
+    tssNoReplicate = true;
+}
+
+BrokerHandler::ScopedSuppressReplication::~ScopedSuppressReplication() {
+    assert(tssNoReplicate);
+    tssNoReplicate = false;
+}
+
+BrokerHandler::BrokerHandler(Core& c) : core(c) {}
+
+RoutingId BrokerHandler::nextRoutingId() {
+    RoutingId id = ++routingId;
+    if (id == 0) id = ++routingId; // Avoid 0 on wrap-around.
+    return id;
+}
+
+void BrokerHandler::routing(const boost::intrusive_ptr<Message>&) { }
+
+bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg)
+{
+    if (tssNoReplicate) return true;
+    if (!tssRoutingId) {             // This is the first enqueue, so send the message
+        tssRoutingId = nextRoutingId();
+        // FIXME aconway 2010-10-20: replicate message in fixed size buffers.
+        std::string data(msg->encodedSize(),char());
+        framing::Buffer buf(&data[0], data.size());
+        msg->encode(buf);
+        core.mcast(ClusterMessageRoutingBody(ProtocolVersion(), tssRoutingId, data));
+        core.getRoutingMap().put(tssRoutingId, msg);
+    }
+    core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), tssRoutingId, queue.getName()));
+    // TODO aconway 2010-10-21: configable option for strict (wait
+    // for CPG deliver to do local deliver) vs.  loose (local deliver
+    // immediately).
+    return false;
+}
+
+void BrokerHandler::routed(const boost::intrusive_ptr<Message>&) {
+    if (tssRoutingId) {             // we enqueued at least one message.
+        core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), tssRoutingId));
+        // Note: routingMap is cleaned up on CPG delivery in MessageHandler.
+        tssRoutingId = 0;
+    }
+}
+
+void BrokerHandler::dequeue(const broker::QueuedMessage& qm) {
+    if (tssNoReplicate) return;
+    // FIXME aconway 2010-10-28: we also need to delay completion of the
+    // ack that caused this dequeue until self-delivery of the mcast below.
+    core.mcast(ClusterMessageDequeueBody(
+                   ProtocolVersion(), qm.queue->getName(), qm.position));
+}
+
+void BrokerHandler::create(const broker::Queue& q) {
+    if (tssNoReplicate) return;
+    std::string data(q.encodedSize(), '\0');
+    framing::Buffer buf(&data[0], data.size());
+    q.encode(buf);
+    core.mcast(ClusterWiringCreateQueueBody(ProtocolVersion(), data));
+}
+
+void BrokerHandler::destroy(const broker::Queue& q) {
+    if (tssNoReplicate) return;
+    core.mcast(ClusterWiringDestroyQueueBody(ProtocolVersion(), q.getName()));
+}
+
+void BrokerHandler::create(const broker::Exchange& ex) {
+    if (tssNoReplicate) return;
+    std::string data(ex.encodedSize(), '\0');
+    framing::Buffer buf(&data[0], data.size());
+    ex.encode(buf);
+    core.mcast(ClusterWiringCreateExchangeBody(ProtocolVersion(), data));
+}
+
+void BrokerHandler::destroy(const broker::Exchange& ex) {
+    if (tssNoReplicate) return;
+    core.mcast(ClusterWiringDestroyExchangeBody(ProtocolVersion(), ex.getName()));
+}
+
+void BrokerHandler::bind(const broker::Queue& q, const broker::Exchange& ex,
+                         const std::string& key, const framing::FieldTable& args)
+{
+    if (tssNoReplicate) return;
+    core.mcast(ClusterWiringBindBody(
+                   ProtocolVersion(), q.getName(), ex.getName(), key, args));
+}
+
+void BrokerHandler::unbind(const broker::Queue& q, const broker::Exchange& ex,
+                         const std::string& key, const framing::FieldTable& args)
+{
+    if (tssNoReplicate) return;
+    core.mcast(ClusterWiringUnbindBody(
+                   ProtocolVersion(), q.getName(), ex.getName(), key, args));
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h Tue Feb 22 18:23:06 2011
@@ -0,0 +1,86 @@
+#ifndef QPID_CLUSTER_BROKERHANDLER_H
+#define QPID_CLUSTER_BROKERHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Cluster.h"
+#include "qpid/sys/AtomicValue.h"
+
+namespace qpid {
+namespace cluster {
+class Core;
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+/**
+ * Implements broker::Cluster interface, handles events in broker code.
+ */
+class BrokerHandler : public broker::Cluster
+{
+  public:
+    /** Suppress replication while in scope.
+     * Used to prevent re-replication of messages received from the cluster.
+     */
+    struct ScopedSuppressReplication {
+        ScopedSuppressReplication();
+        ~ScopedSuppressReplication();
+    };
+
+    BrokerHandler(Core&);
+
+    // FIXME aconway 2010-10-20: implement all points.
+
+    // Messages
+
+    void routing(const boost::intrusive_ptr<broker::Message>&);
+    bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&);
+    void routed(const boost::intrusive_ptr<broker::Message>&);
+    void acquire(const broker::QueuedMessage&) {}
+    void release(const broker::QueuedMessage&) {}
+    void dequeue(const broker::QueuedMessage&);
+
+    // Consumers
+
+    void consume(const broker::Queue&, size_t) {}
+    void cancel(const broker::Queue&, size_t) {}
+
+    // Wiring
+
+    void create(const broker::Queue&);
+    void destroy(const broker::Queue&);
+    void create(const broker::Exchange&);
+    void destroy(const broker::Exchange&);
+    void bind(const broker::Queue&, const broker::Exchange&,
+              const std::string&, const framing::FieldTable&);
+    void unbind(const broker::Queue&, const broker::Exchange&,
+                const std::string&, const framing::FieldTable&);
+
+
+  private:
+    uint32_t nextRoutingId();
+
+    Core& core;
+    sys::AtomicValue<uint32_t> routingId;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_BROKERHANDLER_H*/

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/BrokerHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp Tue Feb 22 18:23:06 2011
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <qpid/Options.h>
+#include <qpid/broker/Broker.h>
+#include "Core.h"
+
+namespace qpid {
+namespace cluster {
+using broker::Broker;
+
+// TODO aconway 2010-10-19: experimental new cluster code.
+
+/**
+ * Plugin for the cluster.
+ */
+struct Cluster2Plugin : public Plugin {
+    struct Opts : public Options {
+        Core::Settings& settings;
+        Opts(Core::Settings& s) : Options("Cluster Options"), settings(s) {
+            addOptions()
+                ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join");
+            // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h
+        }
+    };
+
+    Core::Settings settings;
+    Opts options;
+    Core* core;                 // Core deletes itself on shutdown.
+
+    Cluster2Plugin() : options(settings), core(0) {}
+
+    Options* getOptions() { return &options; }
+
+    void earlyInitialize(Plugin::Target& target) {
+        if (settings.name.empty()) return;
+        Broker* broker = dynamic_cast<Broker*>(&target);
+        if (!broker) return;
+        core = new Core(settings, *broker);
+    }
+
+    void initialize(Plugin::Target& target) {
+        Broker* broker = dynamic_cast<Broker*>(&target);
+        if (broker && core) core->initialize();
+    }
+};
+
+static Cluster2Plugin instance; // Static initialization.
+
+}} // namespace qpid::cluster

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.cpp?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.cpp (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.cpp Tue Feb 22 18:23:06 2011
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "EventHandler.h"
+#include "BrokerHandler.h"
+#include "WiringHandler.h"
+#include "MessageHandler.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/SignalHandler.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/log/Statement.h"
+#include <sys/uio.h>            // For iovec
+
+namespace qpid {
+namespace cluster {
+
+Core::Core(const Settings& s, broker::Broker& b) :
+    broker(b),
+    eventHandler(new EventHandler(*this))
+{
+    eventHandler->add(boost::shared_ptr<HandlerBase>(new WiringHandler(*eventHandler)));
+    eventHandler->add(boost::shared_ptr<HandlerBase>(new MessageHandler(*eventHandler)));
+
+    std::auto_ptr<BrokerHandler> bh(new BrokerHandler(*this));
+    brokerHandler = bh.get();
+    // BrokerHandler belongs to Broker
+    broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
+    eventHandler->start();
+    eventHandler->getCpg().join(s.name);
+    // TODO aconway 2010-11-18: logging standards
+    QPID_LOG(notice, "cluster: joined " << s.name << ", member-id="<< eventHandler->getSelf());
+}
+
+void Core::initialize() {}
+
+void Core::fatal() {
+    // FIXME aconway 2010-10-20: error handling
+    assert(0);
+    broker::SignalHandler::shutdown();
+}
+
+void Core::mcast(const framing::AMQBody& body) {
+    QPID_LOG(trace, "cluster multicast: " << body);
+    // FIXME aconway 2010-10-20: use Multicaster, or bring in its features.
+    // here we multicast Frames rather than Events.
+    framing::AMQFrame f(body);
+    std::string data(f.encodedSize(), char());
+    framing::Buffer buf(&data[0], data.size());
+    f.encode(buf);
+    iovec iov = { buf.getPointer(), buf.getSize() };
+    while (!eventHandler->getCpg().mcast(&iov, 1))
+        ::usleep(1000);      // FIXME aconway 2010-10-20: flow control
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.h?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.h Tue Feb 22 18:23:06 2011
@@ -0,0 +1,93 @@
+#ifndef QPID_CLUSTER_CORE_H
+#define QPID_CLUSTER_CORE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <memory>
+#include "LockedMap.h"
+#include "qpid/cluster/types.h"
+#include "qpid/cluster/Cpg.h"
+#include "qpid/broker/QueuedMessage.h"
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+namespace qpid {
+
+namespace framing{
+class AMQBody;
+}
+
+namespace broker {
+class Broker;
+}
+
+namespace cluster {
+class EventHandler;
+class BrokerHandler;
+
+/**
+ * Cluster core state machine.
+ * Holds together the various objects that implement cluster behavior,
+ * and holds state that is shared by multiple components.
+ *
+ * Thread safe: called from broker connection threads and CPG dispatch threads.
+ */
+class Core
+{
+  public:
+    /** Configuration settings */
+    struct Settings {
+        std::string name;
+    };
+
+    typedef LockedMap<RoutingId, boost::intrusive_ptr<broker::Message> > RoutingMap;
+
+    /** Constructed during Plugin::earlyInitialize() */
+    Core(const Settings&, broker::Broker&);
+
+    /** Called during Plugin::initialize() */
+    void initialize();
+
+    /** Shut down broker due to fatal error. Caller should log a critical message */
+    void fatal();
+
+    /** Multicast an event */
+    void mcast(const framing::AMQBody&);
+
+    broker::Broker& getBroker() { return broker; }
+    EventHandler& getEventHandler() { return *eventHandler; }
+    BrokerHandler& getBrokerHandler() { return *brokerHandler; }
+
+    /** Map of messages that are currently being routed.
+     * Used to pass messages being routed from BrokerHandler to MessageHandler
+     */
+    RoutingMap& getRoutingMap() { return routingMap; }
+  private:
+    broker::Broker& broker;
+    std::auto_ptr<EventHandler> eventHandler; // Handles CPG events.
+    BrokerHandler* brokerHandler; // Handles broker events.
+    RoutingMap routingMap;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_CORE_H*/

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/Core.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp Tue Feb 22 18:23:06 2011
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "EventHandler.h"
+#include "HandlerBase.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/cluster/types.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+EventHandler::EventHandler(Core& c) :
+    core(c),
+    cpg(*this),                 // FIXME aconway 2010-10-20: belongs on Core.
+    dispatcher(cpg, core.getBroker().getPoller(), boost::bind(&Core::fatal, &core)),
+    self(cpg.self())
+{}
+
+EventHandler::~EventHandler() {}
+
+void EventHandler::add(const boost::shared_ptr<HandlerBase>& handler) {
+    handlers.push_back(handler);
+}
+
+void EventHandler::start() {
+    dispatcher.start();
+}
+
+// Print member ID or "self" if member is self
+struct PrettyId {
+    MemberId id, self;
+    PrettyId(const MemberId& id_, const MemberId& self_) : id(id_), self(self_) {}
+};
+
+std::ostream& operator<<(std::ostream& o, const PrettyId& id) {
+    if (id.id == id.self) return o << "self";
+    else return o << id.id;
+}
+
+// Deliver CPG message.
+void EventHandler::deliver(
+    cpg_handle_t /*handle*/,
+    const cpg_name* /*group*/,
+    uint32_t nodeid,
+    uint32_t pid,
+    void* msg,
+    int msg_len)
+{
+    sender = MemberId(nodeid, pid);
+    framing::Buffer buf(static_cast<char*>(msg), msg_len);
+    framing::AMQFrame frame;
+    while (buf.available()) {
+        frame.decode(buf);
+        assert(frame.getBody());
+        QPID_LOG(trace, "cluster deliver: " << PrettyId(sender, self) << " "
+                 << *frame.getBody());
+        try {
+            invoke(*frame.getBody());
+        } catch (const std::exception& e) {
+            // Note: exceptions are assumed to be survivable,
+            // fatal errors should log a message and call Core::fatal.
+            QPID_LOG(error, e.what());
+        }
+    }
+}
+
+void EventHandler::invoke(const framing::AMQBody& body) {
+    for (Handlers::iterator i = handlers.begin(); i != handlers.end(); ++i)
+        if ((*i)->invoke(body)) return;
+    QPID_LOG(error, "Cluster received unknown control: " << body );
+    assert(0);                  // Error handling
+}
+
+struct PrintAddrs {
+    PrintAddrs(const cpg_address* a, int n ) : addrs(a), count(n) {}
+    const cpg_address* addrs;
+    int count;
+};
+
+std::ostream& operator<<(std::ostream& o, const PrintAddrs& pa) {
+    for (const cpg_address* a = pa.addrs; a != pa.addrs+pa.count; ++a)
+        o << MemberId(*a) << " ";
+    return o;
+}
+
+// CPG config-change callback.
+void EventHandler::configChange (
+    cpg_handle_t /*handle*/,
+    const cpg_name */*group*/,
+    const cpg_address *members, int nMembers,
+    const cpg_address *left, int nLeft,
+    const cpg_address *joined, int nJoined)
+{
+    // FIXME aconway 2010-10-20: TODO
+    QPID_LOG(notice, "cluster: new membership: " << PrintAddrs(members, nMembers));
+    QPID_LOG_IF(notice, nLeft, "cluster:   members left: " << PrintAddrs(left, nLeft));
+    QPID_LOG_IF(notice, nJoined, "cluster:   members joined: " << PrintAddrs(joined, nJoined));
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.h?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.h Tue Feb 22 18:23:06 2011
@@ -0,0 +1,95 @@
+#ifndef QPID_CLUSTER_EVENTHANDLER_H
+#define QPID_CLUSTER_EVENTHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+#include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/PollerDispatch.h"
+#include "qpid/cluster/types.h"
+#include <boost/shared_ptr.hpp>
+#include <vector>
+
+namespace qpid {
+
+namespace framing {
+class AMQBody;
+}
+
+namespace cluster {
+class Core;
+class HandlerBase;
+
+/**
+ * Dispatch events received from a CPG group.
+ * A container for Handler objects that handle specific cluster.xml classes.
+ * Thread unsafe: only called in its own CPG deliver thread context.
+ */
+class EventHandler : public Cpg::Handler
+{
+  public:
+    EventHandler(Core&);
+    ~EventHandler();
+
+    /** Add a handler */
+    void add(const boost::shared_ptr<HandlerBase>&);
+
+    /** Start polling */
+    void start();
+
+    void deliver( // CPG deliver callback.
+        cpg_handle_t /*handle*/,
+        const struct cpg_name *group,
+        uint32_t /*nodeid*/,
+        uint32_t /*pid*/,
+        void* /*msg*/,
+        int /*msg_len*/);
+
+    void configChange( // CPG config change callback.
+        cpg_handle_t /*handle*/,
+        const struct cpg_name */*group*/,
+        const struct cpg_address */*members*/, int /*nMembers*/,
+        const struct cpg_address */*left*/, int /*nLeft*/,
+        const struct cpg_address */*joined*/, int /*nJoined*/
+    );
+
+    MemberId getSender() { return sender; }
+    MemberId getSelf() { return self; }
+    Core& getCore() { return core; }
+    Cpg& getCpg() { return cpg; }
+
+  private:
+    void invoke(const framing::AMQBody& body);
+
+    Core& core;
+    Cpg cpg;
+    PollerDispatch dispatcher;
+    MemberId sender;              // sender of current event.
+    MemberId self;
+
+    typedef std::vector<boost::shared_ptr<HandlerBase> > Handlers;
+    Handlers handlers;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_EVENTHANDLER_H*/

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp (from r1073372, qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h)
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp?p2=qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp&p1=qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h&r1=1073372&r2=1073448&rev=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.cpp Tue Feb 22 18:23:06 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,31 +18,19 @@
  * under the License.
  *
  */
-#ifndef _QueuedMessage_
-#define _QueuedMessage_
-
-#include "qpid/broker/Message.h"
+#include "HandlerBase.h"
+#include "EventHandler.h"
 
 namespace qpid {
-namespace broker {
+namespace cluster {
 
-class Queue;
+HandlerBase::HandlerBase(EventHandler& eh) : eventHandler(eh) {}
 
-struct QueuedMessage
-{
-    boost::intrusive_ptr<Message> payload;
-    framing::SequenceNumber position;
-    Queue* queue;
+HandlerBase::~HandlerBase() {}
 
-    QueuedMessage() : queue(0) {}
-    QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) : 
-        payload(msg), position(sn), queue(q) {}
-    QueuedMessage(Queue* q) : queue(q) {}
-    
-};
-    inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; } 
+MemberId HandlerBase::sender() { return eventHandler.getSender(); }
 
-}}
+MemberId HandlerBase::self() { return eventHandler.getSelf(); }
 
 
-#endif
+}} // namespace qpid::cluster

Copied: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h (from r1073372, qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h)
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h?p2=qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h&p1=qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h&r1=1073372&r2=1073448&rev=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/broker/QueuedMessage.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/HandlerBase.h Tue Feb 22 18:23:06 2011
@@ -1,3 +1,6 @@
+#ifndef QPID_CLUSTER_HANDLERBASE_H
+#define QPID_CLUSTER_HANDLERBASE_H
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -7,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,31 +21,34 @@
  * under the License.
  *
  */
-#ifndef _QueuedMessage_
-#define _QueuedMessage_
-
-#include "qpid/broker/Message.h"
+#include "qpid/cluster/types.h"
 
 namespace qpid {
-namespace broker {
 
-class Queue;
+namespace framing {
+class AMQBody;
+}
 
-struct QueuedMessage
+namespace cluster {
+class EventHandler;
+
+/**
+ * Base class for handlers of events, children of the EventHandler.
+ */
+class HandlerBase
 {
-    boost::intrusive_ptr<Message> payload;
-    framing::SequenceNumber position;
-    Queue* queue;
-
-    QueuedMessage() : queue(0) {}
-    QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) : 
-        payload(msg), position(sn), queue(q) {}
-    QueuedMessage(Queue* q) : queue(q) {}
-    
+  public:
+    HandlerBase(EventHandler&);
+    virtual ~HandlerBase();
+
+    virtual bool invoke(const framing::AMQBody& body) = 0;
+
+  protected:
+    EventHandler& eventHandler;
+    MemberId sender();
+    MemberId self();
 };
-    inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; } 
-
-}}
 
+}} // namespace qpid::cluster
 
-#endif
+#endif  /*!QPID_CLUSTER_HANDLERBASE_H*/

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/LockedMap.h?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/LockedMap.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/LockedMap.h Tue Feb 22 18:23:06 2011
@@ -0,0 +1,73 @@
+#ifndef QPID_CLUSTER_LOCKEDMAP_H
+#define QPID_CLUSTER_LOCKEDMAP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A reader-writer locked thread safe map.
+ */
+template <class Key, class Value>
+class LockedMap
+{
+  public:
+    /** Get value associated with key, returns Value() if none. */
+    Value get(const Key& key) const {
+        sys::RWlock::ScopedRlock r(lock);
+        typename Map::const_iterator i = map.find(key);
+        if (i == map.end()) return Value();
+        else return i->second;
+    }
+
+    /** Associate value with key, overwriting any previous value for key. */
+    void put(const Key& key, const Value& value) {
+        sys::RWlock::ScopedWlock w(lock);
+        map[key] = value;
+    }
+
+    /** Associate value with key if there is not already a value associated with key.
+     * Returns true if the value was added.
+     */
+    bool add(const Key& key, const Value& value) {
+        sys::RWlock::ScopedWlock w(lock);
+        return map.insert(key, value).second;
+    }
+
+    /** Erase the value associated with key if any. Return true if a value was erased. */
+    bool erase(const Key& key) {
+        sys::RWlock::ScopedWlock w(lock);
+        return map.erase(key);
+    }
+
+  private:
+    typedef std::map<Key, Value> Map;
+    Map map;
+    mutable sys::RWlock lock;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_LOCKEDMAP_H*/

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/LockedMap.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org