You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/02/16 22:38:38 UTC

svn commit: r1071410 - in /qpid/branches/qpid-2935/qpid/cpp/src/qpid: broker/ cluster/ sys/

Author: aconway
Date: Wed Feb 16 21:38:38 2011
New Revision: 1071410

URL: http://svn.apache.org/viewvc?rev=1071410&view=rev
Log:
Merge branch 'set-in-cluster-early-init' into qpid-2935

Modified:
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/Event.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/EventFrame.h
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
    qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ClusterSafe.h

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1071410&r1=1071409&r2=1071410&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.cpp Wed Feb 16 21:38:38 2011
@@ -60,7 +60,6 @@
 #include "qpid/StringUtils.h"
 #include "qpid/Url.h"
 #include "qpid/Version.h"
-#include "qpid/sys/ClusterSafe.h"
 
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
@@ -171,8 +170,9 @@ Broker::Broker(const Broker::Options& co
             conf.replayHardLimit*1024),
         *this),
     queueCleaner(queues, timer),
-    queueEvents(poller,!conf.asyncQueueEvents), 
+    queueEvents(poller,!conf.asyncQueueEvents),
     recovery(true),
+    inCluster(false),
     clusterUpdatee(false),
     expiryPolicy(new ExpiryPolicy),
     connectionCounter(conf.maxConnections),
@@ -230,8 +230,8 @@ Broker::Broker(const Broker::Options& co
     // Early-Initialize plugins
     Plugin::earlyInitAll(*this);
 
-    /** todo KAG - remove once cluster support for flow control done + (and ClusterSafe.h include above). */
-    if (sys::isCluster()) {
+    /** todo KAG - remove once cluster support for flow control done */
+    if (isInCluster()) {
         QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled by default.");
         QueueFlowLimit::setDefaults(0, 0, 0);
     } else {
@@ -239,7 +239,7 @@ Broker::Broker(const Broker::Options& co
     }
 
     // If no plugin store module registered itself, set up the null store.
-    if (NullMessageStore::isNullStore(store.get())) 
+    if (NullMessageStore::isNullStore(store.get()))
         setStore();
 
     exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
@@ -358,14 +358,14 @@ void Broker::run() {
         Dispatcher d(poller);
         int numIOThreads = config.workerThreads;
         std::vector<Thread> t(numIOThreads-1);
-        
+
         // Run n-1 io threads
         for (int i=0; i<numIOThreads-1; ++i)
             t[i] = Thread(d);
-        
+
         // Run final thread
         d.run();
-        
+
         // Now wait for n-1 io threads to exit
         for (int i=0; i<numIOThreads-1; ++i) {
             t[i].join();
@@ -412,9 +412,9 @@ Manageable::status_t Broker::ManagementM
     {
     case _qmf::Broker::METHOD_ECHO :
         QPID_LOG (debug, "Broker::echo("
-                  << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence 
-                  << ", " 
-                  << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body 
+                  << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence
+                  << ", "
+                  << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body
                   << ")");
         status = Manageable::STATUS_OK;
         break;
@@ -479,7 +479,7 @@ std::string Broker::getLogLevel()
     const std::vector<std::string>& selectors = qpid::log::Logger::instance().getOptions().selectors;
     for (std::vector<std::string>::const_iterator i = selectors.begin(); i != selectors.end(); ++i) {
         if (i != selectors.begin()) level += std::string(",");
-        level += *i;        
+        level += *i;
     }
     return level;
 }

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h?rev=1071410&r1=1071409&r2=1071410&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Broker.h Wed Feb 16 21:38:38 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -57,7 +57,7 @@
 #include <string>
 #include <vector>
 
-namespace qpid { 
+namespace qpid {
 
 namespace sys {
     class ProtocolFactory;
@@ -80,7 +80,7 @@ struct NoSuchTransportException : qpid::
 };
 
 /**
- * A broker instance. 
+ * A broker instance.
  */
 class Broker : public sys::Runnable, public Plugin::Target,
                public management::Manageable,
@@ -121,25 +121,25 @@ public:
       private:
         std::string getHome();
     };
-    
+
     class ConnectionCounter {
             int maxConnections;
             int connectionCount;
             sys::Mutex connectionCountLock;
         public:
             ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {};
-            void inc_connectionCount() {    
-                sys::ScopedLock<sys::Mutex> l(connectionCountLock); 
+            void inc_connectionCount() {
+                sys::ScopedLock<sys::Mutex> l(connectionCountLock);
                 connectionCount++;
-            } 
-            void dec_connectionCount() {    
-                sys::ScopedLock<sys::Mutex> l(connectionCountLock); 
+            }
+            void dec_connectionCount() {
+                sys::ScopedLock<sys::Mutex> l(connectionCountLock);
                 connectionCount--;
             }
             bool allowConnection() {
-                sys::ScopedLock<sys::Mutex> l(connectionCountLock); 
+                sys::ScopedLock<sys::Mutex> l(connectionCountLock);
                 return (maxConnections <= connectionCount);
-            } 
+            }
     };
 
   private:
@@ -177,10 +177,10 @@ public:
                            const boost::intrusive_ptr<Message>& msg);
     std::string federationTag;
     bool recovery;
-    bool clusterUpdatee;
+    bool inCluster, clusterUpdatee;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
     ConnectionCounter connectionCounter;
-    
+
   public:
     virtual ~Broker();
 
@@ -236,7 +236,7 @@ public:
     QPID_BROKER_EXTERN void accept();
 
     /** Create a connection to another broker. */
-    void connect(const std::string& host, uint16_t port, 
+    void connect(const std::string& host, uint16_t port,
                  const std::string& transport,
                  boost::function2<void, int, std::string> failed,
                  sys::ConnectionCodec::Factory* =0);
@@ -248,9 +248,9 @@ public:
     /** Move messages from one queue to another.
         A zero quantity means to move all messages
     */
-    uint32_t queueMoveMessages( const std::string& srcQueue, 
+    uint32_t queueMoveMessages( const std::string& srcQueue,
 			    const std::string& destQueue,
-			    uint32_t  qty); 
+			    uint32_t  qty);
 
     boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(const std::string& name = TCP_TRANSPORT) const;
 
@@ -274,11 +274,20 @@ public:
     void setRecovery(bool set) { recovery = set; }
     bool getRecovery() const { return recovery; }
 
-    void setClusterUpdatee(bool set) { clusterUpdatee = set; }
+    /** True of this broker is part of a cluster.
+     * Only valid after early initialization of plugins is complete.
+     */
+    bool isInCluster() const { return inCluster; }
+    void setInCluster(bool set) { inCluster = set; }
+
+    /** True if this broker is joining a cluster and in the process of
+     * receiving a state update.
+     */
     bool isClusterUpdatee() const { return clusterUpdatee; }
+    void setClusterUpdatee(bool set) { clusterUpdatee = set; }
 
     management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
-    
+
     ConnectionCounter& getConnectionCounter() {return connectionCounter;}
 
     /**

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1071410&r1=1071409&r2=1071410&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Connection.cpp Wed Feb 16 21:38:38 2011
@@ -278,8 +278,7 @@ void Connection::setUserId(const string&
     ConnectionState::setUserId(userId);
     // In a cluster, the cluster code will raise the connect event
     // when the connection is replicated to the cluster.
-    if (!sys::isCluster())
-        raiseConnectEvent();
+    if (!broker.isInCluster()) raiseConnectEvent();
 }
 
 void Connection::raiseConnectEvent() {

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Link.cpp?rev=1071410&r1=1071409&r2=1071410&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/Link.cpp Wed Feb 16 21:38:38 2011
@@ -134,7 +134,7 @@ void Link::established ()
     QPID_LOG (info, "Inter-broker link established to " << addr.str());
 
     // Don't raise the management event in a cluster, other members wont't get this call.
-    if (!sys::isCluster()) 
+    if (broker && !broker->isInCluster())
         agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
 
     {
@@ -159,7 +159,7 @@ void Link::closed (int, std::string text
         stringstream addr;
         addr << host << ":" << port;
         QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
-        if (!sys::isCluster())
+        if (broker && !broker->isInCluster())
             agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
     }
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1071410&r1=1071409&r2=1071410&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Wed Feb 16 21:38:38 2011
@@ -320,11 +320,10 @@ std::auto_ptr<QueueFlowLimit> QueueFlowL
             return std::auto_ptr<QueueFlowLimit>();
         }
         /** todo KAG - remove once cluster support for flow control done. */
-        if (sys::isCluster()) {
-            if (queue) {
-                QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
-                         << queue->getName());
-            }
+        // TODO aconway 2011-02-16: is queue==0 only in tests?
+        if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
+            QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
+                     << queue->getName());
             return std::auto_ptr<QueueFlowLimit>();
         }
         return std::auto_ptr<QueueFlowLimit>(new QueueFlowLimit(queue, flowStopCount, flowResumeCount,
@@ -337,11 +336,9 @@ std::auto_ptr<QueueFlowLimit> QueueFlowL
         uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
 
         /** todo KAG - remove once cluster support for flow control done. */
-        if (sys::isCluster()) {
-            if (queue) {
-                QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
-                         << queue->getName());
-            }
+        if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
+            QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue "
+                     << queue->getName());
             return std::auto_ptr<QueueFlowLimit>();
         }
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=1071410&r1=1071409&r2=1071410&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Feb 16 21:38:38 2011
@@ -36,28 +36,28 @@
  *
  * IMPORTANT NOTE: any time code is added to the broker that uses timers,
  * the cluster may need to be updated to take account of this.
- * 
+ *
  *
  * USE OF TIMESTAMPS IN THE BROKER
- *  
+ *
  * The following are the current areas where broker uses timers or timestamps:
- * 
+ *
  * - Producer flow control: broker::SemanticState uses
  *   connection::getClusterOrderOutput.  a FrameHandler that sends
  *   frames to the client via the cluster. Used by broker::SessionState
- *   
+ *
  * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
  *   implemented by cluster::ExpiryPolicy.
- * 
+ *
  * - Connection heartbeat: sends connection controls, not part of
  *   session command counting so OK to ignore.
- * 
+ *
  * - LinkRegistry: only cluster elder is ever active for links.
- * 
+ *
  * - management::ManagementBroker: uses MessageHandler supplied by  cluster
  *   to send messages to the broker via the cluster.
- *   
- * - Dtx: not yet supported with cluster.  
+ *
+ * - Dtx: not yet supported with cluster.
  *
  * cluster::ExpiryPolicy implements the strategy for message expiry.
  *
@@ -65,16 +65,16 @@
  * Used for periodic management events.
  *
  * <h1>CLUSTER PROTOCOL OVERVIEW</h1>
- * 
+ *
  * Messages sent to/from CPG are called Events.
  *
  * An Event carries a ConnectionId, which includes a MemberId and a
  * connection number.
- * 
+ *
  * Events are either
  *  - Connection events: non-0 connection number and are associated with a connection.
  *  - Cluster Events: 0 connection number, are not associated with a connection.
- * 
+ *
  * Events are further categorized as:
  *  - Control: carries method frame(s) that affect cluster behavior.
  *  - Data: carries raw data received from a client connection.
@@ -214,7 +214,7 @@ struct ClusterDispatcher : public framin
     {
         cluster.initialStatus(
             member, version, active, clusterId,
-            framing::cluster::StoreState(storeState), shutdownId, 
+            framing::cluster::StoreState(storeState), shutdownId,
             firstConfig, l);
     }
     void ready(const std::string& url) {
@@ -244,7 +244,7 @@ struct ClusterDispatcher : public framin
 };
 
 Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
-    settings(set), 
+    settings(set),
     broker(b),
     mgmtObject(0),
     poller(b.getPoller()),
@@ -279,6 +279,8 @@ Cluster::Cluster(const ClusterSettings& 
     updateClosed(false),
     error(*this)
 {
+    broker.setInCluster(true);
+
     // We give ownership of the timer to the broker and keep a plain pointer.
     // This is OK as it means the timer has the same lifetime as the broker.
     timer = new ClusterTimer(*this);
@@ -299,7 +301,7 @@ Cluster::Cluster(const ClusterSettings& 
     // Load my store status before we go into initialization
     if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
         store.load();
-        clusterId = store.getClusterId(); 
+        clusterId = store.getClusterId();
         QPID_LOG(notice, "Cluster store state: " << store)
             }
     cpg.join(name);
@@ -360,14 +362,14 @@ void Cluster::addShadowConnection(const 
     // Safe to use connections here because we're pre-catchup, stalled
     // and discarding, so deliveredFrame is not processing any
     // connection events.
-    assert(discarding);         
+    assert(discarding);
     pair<ConnectionMap::iterator, bool> ib
         = connections.insert(ConnectionMap::value_type(c->getId(), c));
     assert(ib.second);
 }
 
 void Cluster::erase(const ConnectionId& id) {
-    Lock l(lock);    
+    Lock l(lock);
     erase(id,l);
 }
 
@@ -393,9 +395,9 @@ std::vector<Url> Cluster::getUrls() cons
 
 std::vector<Url> Cluster::getUrls(Lock&) const {
     return map.memberUrls();
-} 
+}
 
-void Cluster::leave() { 
+void Cluster::leave() {
     Lock l(lock);
     leave(l);
 }
@@ -405,7 +407,7 @@ void Cluster::leave() { 
         QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
     } do {} while(0)
 
-void Cluster::leave(Lock&) { 
+void Cluster::leave(Lock&) {
     if (state != LEFT) {
         state = LEFT;
         QPID_LOG(notice, *this << " leaving cluster " << name);
@@ -424,7 +426,7 @@ void Cluster::deliver(
     uint32_t nodeid,
     uint32_t pid,
     void* msg,
-    int msg_len) 
+    int msg_len)
 {
     MemberId from(nodeid, pid);
     framing::Buffer buf(static_cast<char*>(msg), msg_len);
@@ -455,7 +457,7 @@ void Cluster::deliveredEvent(const Event
         EventFrame ef(e, e.getFrame());
         // Stop the deliverEventQueue on update offers.
         // This preserves the connection decoder fragments for an update.
-        // Only do this for the two brokers that are directly involved in this 
+        // Only do this for the two brokers that are directly involved in this
         // offer: the one making the offer, or the one receiving it.
         const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody());
         if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) {
@@ -465,7 +467,7 @@ void Cluster::deliveredEvent(const Event
         }
         deliverFrame(ef);
     }
-    else if(!discarding) { 
+    else if(!discarding) {
         if (e.isControl())
             deliverFrame(EventFrame(e, e.getFrame()));
         else {
@@ -507,7 +509,7 @@ void Cluster::deliveredFrame(const Event
         // the event queue.
         e.frame = AMQFrame(
             ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee()));
-        deliverEventQueue.start(); 
+        deliverEventQueue.start();
     }
     // Process each frame through the error checker.
     if (error.isUnresolved()) {
@@ -515,7 +517,7 @@ void Cluster::deliveredFrame(const Event
         while (error.canProcess())  // There is a frame ready to process.
             processFrame(error.getNext(), l);
     }
-    else 
+    else
         processFrame(e, l);
 }
 
@@ -577,7 +579,7 @@ Cluster::ConnectionVector Cluster::getCo
 }
 
 // CPG config-change callback.
-void Cluster::configChange ( 
+void Cluster::configChange (
     cpg_handle_t /*handle*/,
     const cpg_name */*group*/,
     const cpg_address *members, int nMembers,
@@ -607,7 +609,7 @@ void Cluster::setReady(Lock&) {
 }
 
 // Set the management status from the Cluster::state.
-// 
+//
 // NOTE: Management updates are sent based on property changes.  In
 // order to keep consistency across the cluster, we touch the local
 // management status property even if it is locally unchanged for any
@@ -618,7 +620,7 @@ void Cluster::setMgmtStatus(Lock&) {
 }
 
 void Cluster::initMapCompleted(Lock& l) {
-    // Called on completion of the initial status map.    
+    // Called on completion of the initial status map.
     QPID_LOG(debug, *this << " initial status map complete. ");
     setMgmtStatus(l);
     if (state == PRE_INIT) {
@@ -701,8 +703,8 @@ void Cluster::configChange(const MemberI
     if (initMap.isResendNeeded()) {
         mcast.mcastControl(
             ClusterInitialStatusBody(
-                ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, 
-                store.getState(), store.getShutdownId(), 
+                ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
+                store.getState(), store.getShutdownId(),
                 initMap.getFirstConfigStr()
             ),
             self);
@@ -759,7 +761,7 @@ std::string Cluster::debugSnapshot() {
 // point we know the poller has stopped so no poller callbacks will be
 // invoked. We must ensure that CPG has also shut down so no CPG
 // callbacks will be invoked.
-// 
+//
 void Cluster::brokerShutdown()  {
     sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
     try { cpg.shutdown(); }
@@ -775,7 +777,7 @@ void Cluster::updateRequest(const Member
 }
 
 void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active,
-                            const framing::Uuid& id, 
+                            const framing::Uuid& id,
                             framing::cluster::StoreState store,
                             const framing::Uuid& shutdownId,
                             const std::string& firstConfig,
@@ -969,7 +971,7 @@ void Cluster::updateOutDone(Lock& l) {
 
 void Cluster::updateOutError(const std::exception& e)  {
     Monitor::ScopedLock l(lock);
-    QPID_LOG(error, *this << " error sending update: " << e.what());    
+    QPID_LOG(error, *this << " error sending update: " << e.what());
     updateOutDone(l);
 }
 
@@ -1067,7 +1069,7 @@ void Cluster::memberUpdate(Lock& l) {
 void Cluster::updateMgmtMembership(Lock& l) {
     if (!mgmtObject) return;
     std::vector<Url> urls = getUrls(l);
-    mgmtObject->set_clusterSize(urls.size()); 
+    mgmtObject->set_clusterSize(urls.size());
     string urlstr;
     for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) {
         if (i != urls.begin()) urlstr += ";";

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/Event.h?rev=1071410&r1=1071409&r2=1071410&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/Event.h Wed Feb 16 21:38:38 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -53,7 +53,7 @@ class EventHeader {
 
     /** Size of payload data, excluding header. */
     size_t getSize() const { return size; }
-    /** Size of header + payload. */ 
+    /** Size of header + payload. */
     size_t getStoreSize() const { return size + HEADER_SIZE; }
 
     bool isCluster() const { return connectionId.getNumber() == 0; }
@@ -62,7 +62,7 @@ class EventHeader {
 
   protected:
     static const size_t HEADER_SIZE;
-    
+
     EventType type;
     ConnectionId connectionId;
     size_t size;
@@ -86,7 +86,7 @@ class Event : public EventHeader {
 
     /** Create a control event. */
     static Event control(const framing::AMQFrame&, const ConnectionId&);
-    
+
     // Data excluding header.
     char* getData() { return store + HEADER_SIZE; }
     const char* getData() const { return store + HEADER_SIZE; }
@@ -95,12 +95,12 @@ class Event : public EventHeader {
     char* getStore() { return store; }
     const char* getStore() const { return store; }
 
-    const framing::AMQFrame& getFrame() const;        
-    
+    const framing::AMQFrame& getFrame() const;
+
     operator framing::Buffer() const;
 
     iovec toIovec() const;
-    
+
   private:
     void encodeHeader() const;
 

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=1071410&r1=1071409&r2=1071410&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/EventFrame.h Wed Feb 16 21:38:38 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,7 +48,7 @@ struct EventFrame
 
 
     ConnectionId connectionId;
-    framing::AMQFrame frame;   
+    framing::AMQFrame frame;
     int readCredit; ///< last frame in an event, give credit when processed.
     EventType type;
 };

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ClusterSafe.cpp?rev=1071410&r1=1071409&r2=1071410&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ClusterSafe.cpp (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ClusterSafe.cpp Wed Feb 16 21:38:38 2011
@@ -34,8 +34,6 @@ QPID_TSS bool inContext = false;
 
 bool isClusterSafe() { return !inCluster || inContext; }
 
-bool isCluster() { return inCluster; }
-
 void assertClusterSafe()  {
     if (!isClusterSafe()) {
         QPID_LOG(critical, "Modified cluster state outside of cluster context");

Modified: qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ClusterSafe.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ClusterSafe.h?rev=1071410&r1=1071409&r2=1071410&view=diff
==============================================================================
--- qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ClusterSafe.h (original)
+++ qpid/branches/qpid-2935/qpid/cpp/src/qpid/sys/ClusterSafe.h Wed Feb 16 21:38:38 2011
@@ -52,9 +52,6 @@ QPID_COMMON_EXTERN void assertClusterSaf
  */
 QPID_COMMON_EXTERN bool isClusterSafe();
 
-/** Return true in a clustered broker */
-QPID_COMMON_EXTERN bool isCluster();
-
 /**
  * Base class for classes that encapsulate state which is replicated
  * to all members of a cluster. Acts as a marker for clustered state



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org