You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/01/29 23:59:10 UTC

svn commit: r904656 - in /qpid/trunk/qpid/cpp: src/ src/qpid/broker/ src/qpid/cluster/ src/qpid/management/ src/qpid/sys/ xml/

Author: aconway
Date: Fri Jan 29 22:59:09 2010
New Revision: 904656

URL: http://svn.apache.org/viewvc?rev=904656&view=rev
Log:
Replace PeriodicTimer with ClusterTimer, which inherits from Timer.

Added:
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.h
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/cluster.cmake
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
    qpid/trunk/qpid/cpp/src/qpid/sys/Timer.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/Timer.h
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Fri Jan 29 22:59:09 2010
@@ -573,7 +573,6 @@
      qpid/sys/Runnable.cpp
      qpid/sys/Shlib.cpp
      qpid/sys/Timer.cpp
-     qpid/sys/PeriodicTimerImpl.cpp
 )
 
 add_library (qpidcommon SHARED ${qpidcommon_SOURCES})
@@ -735,7 +734,6 @@
      qpid/broker/Connection.cpp
      qpid/broker/ConnectionHandler.cpp
      qpid/broker/ConnectionFactory.cpp
-     qpid/broker/DelegatingPeriodicTimer.cpp
      qpid/broker/DeliverableMessage.cpp
      qpid/broker/DeliveryRecord.cpp
      qpid/broker/DirectExchange.cpp

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Fri Jan 29 22:59:09 2010
@@ -471,9 +471,6 @@
   qpid/sys/TimeoutHandler.h			\
   qpid/sys/Timer.cpp				\
   qpid/sys/Timer.h				\
-  qpid/sys/PeriodicTimer.h			\
-  qpid/sys/PeriodicTimerImpl.h			\
-  qpid/sys/PeriodicTimerImpl.cpp		\
   qpid/sys/Waitable.h				\
   qpid/sys/alloca.h				\
   qpid/sys/uuid.h
@@ -508,8 +505,6 @@
   qpid/broker/Consumer.h \
   qpid/broker/Daemon.cpp \
   qpid/broker/Daemon.h \
-  qpid/broker/DelegatingPeriodicTimer.h \
-  qpid/broker/DelegatingPeriodicTimer.cpp \
   qpid/broker/Deliverable.h \
   qpid/broker/DeliverableMessage.cpp \
   qpid/broker/DeliverableMessage.h \

Modified: qpid/trunk/qpid/cpp/src/cluster.cmake
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.cmake?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.cmake (original)
+++ qpid/trunk/qpid/cpp/src/cluster.cmake Fri Jan 29 22:59:09 2010
@@ -81,6 +81,8 @@
        ${CMAN_SOURCES}
        qpid/cluster/Cluster.cpp
        qpid/cluster/Cluster.h
+       qpid/cluster/ClusterTimer.cpp
+       qpid/cluster/ClusterTimer.h
        qpid/cluster/Decoder.cpp
        qpid/cluster/Decoder.h
        qpid/cluster/PollableQueue.h
@@ -129,8 +131,6 @@
        qpid/cluster/MemberSet.h
        qpid/cluster/MemberSet.cpp
        qpid/cluster/types.h
-       qpid/cluster/PeriodicTimerImpl.h
-       qpid/cluster/PeriodicTimerImpl.cpp
        qpid/cluster/StoreStatus.h
        qpid/cluster/StoreStatus.cpp
       )

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Fri Jan 29 22:59:09 2010
@@ -40,6 +40,8 @@
   $(CMAN_SOURCES)				\
   qpid/cluster/Cluster.cpp			\
   qpid/cluster/Cluster.h			\
+  qpid/cluster/ClusterTimer.cpp			\
+  qpid/cluster/ClusterTimer.h			\
   qpid/cluster/Decoder.cpp			\
   qpid/cluster/Decoder.h			\
   qpid/cluster/PollableQueue.h			\
@@ -88,8 +90,6 @@
   qpid/cluster/MemberSet.h			\
   qpid/cluster/MemberSet.cpp			\
   qpid/cluster/types.h				\
-  qpid/cluster/PeriodicTimerImpl.h		\
-  qpid/cluster/PeriodicTimerImpl.cpp		\
   qpid/cluster/StoreStatus.h			\
   qpid/cluster/StoreStatus.cpp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Jan 29 22:59:09 2010
@@ -49,7 +49,6 @@
 #include "qpid/sys/ConnectionInputHandlerFactory.h"
 #include "qpid/sys/TimeoutHandler.h"
 #include "qpid/sys/SystemInfo.h"
-#include "qpid/sys/PeriodicTimerImpl.h"
 #include "qpid/Address.h"
 #include "qpid/Url.h"
 #include "qpid/Version.h"
@@ -257,11 +256,7 @@
     // Initialize plugins
     Plugin::initializeAll(*this);
 
-    if (!periodicTimer.hasDelegate()) {
-        // If no plugin has contributed a PeriodicTimer, use the default one.
-        periodicTimer.setDelegate(
-            std::auto_ptr<sys::PeriodicTimer>(new sys::PeriodicTimerImpl(timer)));
-    }
+    if (managementAgent.get()) managementAgent->pluginsInitialized();
 
     if (conf.queueCleanInterval) {
         queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC);
@@ -474,8 +469,8 @@
     return knownBrokers;
 }
 
-void Broker::setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt) {
-    periodicTimer.setDelegate(pt);
+void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
+    clusterTimer = t;
 }
 
 const std::string Broker::TCP_TRANSPORT("tcp");

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Jan 29 22:59:09 2010
@@ -25,7 +25,6 @@
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/ConnectionFactory.h"
 #include "qpid/broker/ConnectionToken.h"
-#include "qpid/broker/DelegatingPeriodicTimer.h"
 #include "qpid/broker/DirectExchange.h"
 #include "qpid/broker/DtxManager.h"
 #include "qpid/broker/ExchangeRegistry.h"
@@ -147,7 +146,7 @@
 
     boost::shared_ptr<sys::Poller> poller;
     sys::Timer timer;
-    DelegatingPeriodicTimer periodicTimer;
+    std::auto_ptr<sys::Timer> clusterTimer;
     Options config;
     std::auto_ptr<management::ManagementAgent> managementAgent;
     ProtocolFactoryMap protocolFactories;
@@ -254,9 +253,12 @@
     boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
     void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
 
+    /** Timer for local tasks affecting only this broker */
     sys::Timer& getTimer() { return timer; }
-    sys::PeriodicTimer& getPeriodicTimer() { return periodicTimer; }
-    void setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt);
+
+    /** Timer for tasks that must be synchronized if we are in a cluster */
+    sys::Timer& getClusterTimer() { return clusterTimer.get() ? *clusterTimer : timer; }
+    void setClusterTimer(std::auto_ptr<sys::Timer>);
 
     boost::function<std::vector<Url> ()> getKnownBrokers;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jan 29 22:59:09 2010
@@ -112,7 +112,7 @@
 #include "qpid/cluster/RetractClient.h"
 #include "qpid/cluster/FailoverExchange.h"
 #include "qpid/cluster/UpdateExchange.h"
-#include "qpid/cluster/PeriodicTimerImpl.h"
+#include "qpid/cluster/ClusterTimer.h"
 
 #include "qpid/assert.h"
 #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
@@ -137,7 +137,7 @@
 #include "qpid/framing/ClusterUpdateRequestBody.h"
 #include "qpid/framing/ClusterConnectionAnnounceBody.h"
 #include "qpid/framing/ClusterErrorCheckBody.h"
-#include "qpid/framing/ClusterPeriodicTimerBody.h"
+#include "qpid/framing/ClusterTimerWakeupBody.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Helpers.h"
 #include "qpid/log/Statement.h"
@@ -179,7 +179,7 @@
  * Currently use SVN revision to avoid clashes with versions from
  * different branches.
  */
-const uint32_t Cluster::CLUSTER_VERSION = 903171;
+const uint32_t Cluster::CLUSTER_VERSION = 904565;
 
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;
@@ -209,9 +209,8 @@
     void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) {
         cluster.errorCheck(member, type, frameSeq, l);
     }
-    void periodicTimer(const std::string& name) {
-        cluster.periodicTimer(member, name, l);
-    }
+    void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); }
+    void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); }
 
     void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
 
@@ -245,6 +244,7 @@
     state(INIT),
     initMap(self, settings.size),
     store(broker.getDataDir().getPath()),
+    elder(false),
     lastSize(0),
     lastBroker(false),
     updateRetracted(false),
@@ -252,8 +252,8 @@
 {
     // We give ownership of the timer to the broker and keep a plain pointer.
     // This is OK as it means the timer has the same lifetime as the broker.
-    timer = new PeriodicTimerImpl(*this);
-    broker.setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer>(timer));
+    timer = new ClusterTimer(*this);
+    broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer));
 
     mAgent = broker.getManagementAgent();
     if (mAgent != 0){
@@ -577,14 +577,13 @@
         initMap.checkConsistent();
         elders = initMap.getElders();
         QPID_LOG(debug, *this << " elders: " << elders);
-        if (!elders.empty()) { // I'm not the elder, I don't handle links & replication.
+        if (elders.empty())
+            becomeElder();
+        else {
             broker.getLinks().setPassive(true);
             broker.getQueueEvents().disable();
             QPID_LOG(info, *this << " not active for links.");
         }
-        else {
-            QPID_LOG(info, this << " active for links.");
-        }
         setClusterId(initMap.getClusterId(), l);
         if (store.hasStore()) store.dirty(clusterId);
 
@@ -636,14 +635,19 @@
 
     if (state >= CATCHUP && memberChange) {
         memberUpdate(l);
-        if (elders.empty()) {
-            // We are the oldest, reactive links if necessary
-            QPID_LOG(info, this << " becoming active for links.");
-            broker.getLinks().setPassive(false);
-        }
+        if (elders.empty()) becomeElder();
     }
 }
 
+void Cluster::becomeElder() {
+    if (elder) return;          // We were already the elder.
+    // We are the oldest, reactive links if necessary
+    QPID_LOG(info, *this << " became the elder, active for links.");
+    elder = true;
+    broker.getLinks().setPassive(false);
+    timer->becomeElder();
+}
+
 void Cluster::makeOffer(const MemberId& id, Lock& ) {
     if (state == READY && map.isJoiner(id)) {
         state = OFFER;
@@ -962,13 +966,17 @@
         error.respondNone(from, type, frameSeq);
 }
 
-void Cluster::periodicTimer(const MemberId&, const std::string& name, Lock&) {
-    timer->deliver(name);
+void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) {
+    timer->deliverWakeup(name);
+}
+
+void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) {
+    timer->deliverDrop(name);
 }
 
 bool Cluster::isElder() const {
     Monitor::ScopedLock l(lock);
-    return state >= CATCHUP && elders.empty();
+    return elder;
 }
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Jan 29 22:59:09 2010
@@ -63,7 +63,7 @@
 
 class Connection;
 class EventFrame;
-class PeriodicTimerImpl;
+class ClusterTimer;
 
 /**
  * Connection to the cluster
@@ -164,7 +164,8 @@
     void configChange(const MemberId&, const std::string& current, Lock& l);
     void messageExpired(const MemberId&, uint64_t, Lock& l);
     void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
-    void periodicTimer(const MemberId&, const std::string& name, Lock&);
+    void timerWakeup(const MemberId&, const std::string& name, Lock&);
+    void timerDrop(const MemberId&, const std::string& name, Lock&);
 
     void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
 
@@ -201,6 +202,8 @@
         const struct cpg_address */*joined*/, int /*nJoined*/
     );
 
+    void becomeElder();
+
     // == Called in management threads.
     virtual qpid::management::ManagementObject* GetManagementObject() const;
     virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
@@ -265,6 +268,7 @@
     StoreStatus store;
     ClusterMap map;
     MemberSet elders;
+    bool elder;
     size_t lastSize;
     bool lastBroker;
     sys::Thread updateThread;
@@ -272,7 +276,7 @@
     bool updateRetracted;
     ErrorCheck error;
     UpdateReceiver updateReceiver;
-    PeriodicTimerImpl* timer;
+    ClusterTimer* timer;
 
   friend std::ostream& operator<<(std::ostream&, const Cluster&);
   friend class ClusterDispatcher;

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp?rev=904656&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp Fri Jan 29 22:59:09 2010
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Cluster.h"
+#include "ClusterTimer.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/ClusterTimerWakeupBody.h"
+#include "qpid/framing/ClusterTimerDropBody.h"
+
+namespace qpid {
+namespace cluster {
+
+using boost::intrusive_ptr;
+using std::max;
+using sys::Timer;
+using sys::TimerTask;
+
+
+ClusterTimer::ClusterTimer(Cluster& c) : cluster(c) {}
+
+ClusterTimer::~ClusterTimer() {}
+
+// Initialization or deliver thread.
+void ClusterTimer::add(intrusive_ptr<TimerTask> task)
+{
+    QPID_LOG(trace, "Adding cluster timer task " << task->getName());
+    Map::iterator i = map.find(task->getName());
+    if (i != map.end())
+        throw Exception(QPID_MSG("Task already exists with name " << task->getName()));
+    map[task->getName()] = task;
+    // Only the elder actually activates the task with the Timer base class.
+    if (cluster.isElder()) {
+        QPID_LOG(trace, "Elder activating cluster timer task " << task->getName());
+        Timer::add(task);
+    }
+}
+
+// Timer thread
+void ClusterTimer::fire(intrusive_ptr<TimerTask> t) {
+    // Elder mcasts wakeup on fire, task is not fired until deliverWakeup
+    if (cluster.isElder()) {
+        QPID_LOG(trace, "Sending cluster timer wakeup " << t->getName());
+        cluster.getMulticast().mcastControl(
+            framing::ClusterTimerWakeupBody(framing::ProtocolVersion(), t->getName()),
+            cluster.getId());
+    }
+    else
+        QPID_LOG(trace, "Cluster timer task fired, but not elder " << t->getName());
+}
+
+// Timer thread
+void ClusterTimer::drop(intrusive_ptr<TimerTask> t) {
+    // Elder mcasts drop, task is droped in deliverDrop
+    if (cluster.isElder()) {
+        QPID_LOG(trace, "Sending cluster timer drop " << t->getName());
+        cluster.getMulticast().mcastControl(
+            framing::ClusterTimerDropBody(framing::ProtocolVersion(), t->getName()),
+            cluster.getId());
+    }
+    else
+        QPID_LOG(trace, "Cluster timer task dropped, but not on elder " << t->getName());
+}
+
+// Deliver thread
+void ClusterTimer::deliverWakeup(const std::string& name) {
+    QPID_LOG(trace, "Cluster timer wakeup delivered for " << name);
+    Map::iterator i = map.find(name);
+    if (i == map.end())
+        throw Exception(QPID_MSG("Cluster timer wakeup non-existent task " << name));
+    else {
+        intrusive_ptr<TimerTask> t = i->second;
+        map.erase(i);
+        Timer::fire(t);
+    }
+}
+
+// Deliver thread
+void ClusterTimer::deliverDrop(const std::string& name) {
+    QPID_LOG(trace, "Cluster timer drop delivered for " << name);
+    Map::iterator i = map.find(name);
+    if (i == map.end())
+        throw Exception(QPID_MSG("Cluster timer drop non-existent task " << name));
+    else {
+        intrusive_ptr<TimerTask> t = i->second;
+        map.erase(i);
+    }
+}
+
+// Deliver thread
+void ClusterTimer::becomeElder() {
+    for (Map::iterator i = map.begin(); i != map.end(); ++i) {
+        Timer::add(i->second);
+    }
+}
+
+}}

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.h?rev=904656&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterTimer.h Fri Jan 29 22:59:09 2010
@@ -0,0 +1,58 @@
+#ifndef QPID_CLUSTER_CLUSTERTIMER_H
+#define QPID_CLUSTER_CLUSTERTIMER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Timer.h"
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+class Cluster;
+
+class ClusterTimer : public sys::Timer {
+  public:
+    ClusterTimer(Cluster&);
+    ~ClusterTimer();
+
+    void add(boost::intrusive_ptr<sys::TimerTask> task);
+
+    void deliverWakeup(const std::string& name);
+    void deliverDrop(const std::string& name);
+    void becomeElder();
+
+  protected:
+    void fire(boost::intrusive_ptr<sys::TimerTask> task);
+    void drop(boost::intrusive_ptr<sys::TimerTask> task);
+
+  private:
+    typedef std::map<std::string, boost::intrusive_ptr<sys::TimerTask> > Map;
+    Cluster& cluster;
+    Map map;
+};
+
+
+}}
+
+
+#endif  /*!QPID_CLUSTER_CLUSTERTIMER_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri Jan 29 22:59:09 2010
@@ -52,7 +52,8 @@
 }
 
 ManagementAgent::ManagementAgent () :
-    threadPoolSize(1), interval(10), broker(0), startTime(uint64_t(Duration(now())))
+    threadPoolSize(1), interval(10), broker(0), timer(0),
+    startTime(uint64_t(Duration(now())))
 {
     nextObjectId   = 1;
     brokerBank     = 1;
@@ -91,12 +92,8 @@
     dataDir        = _dataDir;
     interval       = _interval;
     broker         = _broker;
-    timer          = &_broker->getPeriodicTimer();
     threadPoolSize = _threads;
     ManagementObject::maxThreads = threadPoolSize;
-    timer->add (boost::bind(&ManagementAgent::periodicProcessing, this),
-                interval * sys::TIME_SEC,
-                "ManagementAgent::periodicProcessing");
 
     // Get from file or generate and save to file.
     if (dataDir.empty())
@@ -135,6 +132,12 @@
     }
 }
 
+void ManagementAgent::pluginsInitialized() {
+    // Do this here so cluster plugin has the chance to set up the timer.
+    timer          = &broker->getClusterTimer();
+    timer->add(new Periodic(*this, interval));
+}
+
 void ManagementAgent::writeData ()
 {
     string   filename (dataDir + "/.mbrokerdata");
@@ -233,6 +236,19 @@
                "console.event.1.0." + event.getPackageName() + "." + event.getEventName());
 }
 
+ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
+    : TimerTask (qpid::sys::Duration((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC),
+                 "ManagementAgent::periodicProcessing"),
+                 agent(_agent) {}
+
+ManagementAgent::Periodic::~Periodic () {}
+
+void ManagementAgent::Periodic::fire ()
+{
+    agent.timer->add (new Periodic (agent, agent.interval));
+    agent.periodicProcessing ();
+}
+
 void ManagementAgent::clientAdded (const std::string& routingKey)
 {
     if (routingKey.find("console") != 0)

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Fri Jan 29 22:59:09 2010
@@ -26,7 +26,7 @@
 #include "qpid/broker/Exchange.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/sys/Mutex.h"
-#include "qpid/sys/PeriodicTimer.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/broker/ConnectionToken.h"
 #include "qpid/management/ManagementObject.h"
 #include "qpid/management/ManagementEvent.h"
@@ -65,8 +65,12 @@
     ManagementAgent ();
     virtual ~ManagementAgent ();
 
+    /** Called before plugins are initialized */
     void configure       (const std::string& dataDir, uint16_t interval,
                           qpid::broker::Broker* broker, int threadPoolSize);
+    /** Called after plugins are initialized. */
+    void pluginsInitialized();
+
     void setInterval     (uint16_t _interval) { interval = _interval; }
     void setExchange     (qpid::broker::Exchange::shared_ptr mgmtExchange,
                           qpid::broker::Exchange::shared_ptr directExchange);
@@ -112,6 +116,15 @@
     void setBootSequence(uint16_t b) { bootSequence = b; }
 
 private:
+    struct Periodic : public qpid::sys::TimerTask
+    {
+        ManagementAgent& agent;
+
+        Periodic (ManagementAgent& agent, uint32_t seconds);
+        virtual ~Periodic ();
+        void fire ();
+    };
+
     //  Storage for tracking remote management agents, attached via the client
     //  management agent API.
     //
@@ -203,7 +216,7 @@
     std::string                  dataDir;
     uint16_t                     interval;
     qpid::broker::Broker*        broker;
-    qpid::sys::PeriodicTimer*    timer;
+    qpid::sys::Timer*            timer;
     uint16_t                     bootSequence;
     uint32_t                     nextObjectId;
     uint32_t                     brokerBank;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/Timer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Timer.cpp?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/Timer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/Timer.cpp Fri Jan 29 22:59:09 2010
@@ -30,14 +30,16 @@
 namespace qpid {
 namespace sys {
 
-TimerTask::TimerTask(Duration timeout) :
+TimerTask::TimerTask(Duration timeout, const std::string&  n) :
+    name(n),
     sortTime(AbsTime::FarFuture()),
     period(timeout),
     nextFireTime(AbsTime::now(), timeout),
     cancelled(false)
 {}
 
-TimerTask::TimerTask(AbsTime time) :
+TimerTask::TimerTask(AbsTime time, const std::string&  n) :
+    name(n),
     sortTime(AbsTime::FarFuture()),
     period(0),
     nextFireTime(time),
@@ -102,13 +104,15 @@
             {
             ScopedLock<Mutex> l(t->callbackLock);
             if (t->cancelled) {
+                drop(t);
                 if (delay > 500 * TIME_MSEC) {
-                    QPID_LOG(debug, "cancelled Timer woken up " << delay / TIME_MSEC << "ms late");
+                    QPID_LOG(debug, "cancelled Timer woken up " << delay / TIME_MSEC
+                             << "ms late");
                 }
                 continue;
             } else if(Duration(t->nextFireTime, start) >= 0) {
                 Monitor::ScopedUnlock u(monitor);
-                t->fireTask();
+                fire(t);
                 // Warn on callback overrun
                 AbsTime end(AbsTime::now());
                 Duration overrun(tasks.top()->nextFireTime, end);
@@ -169,6 +173,14 @@
     runner.join();
 }
 
+// Allow subclasses to override behavior when firing a task.
+void Timer::fire(boost::intrusive_ptr<TimerTask> t) {
+    t->fireTask();
+}
+
+// Provided for subclasses: called when a task is droped.
+void Timer::drop(boost::intrusive_ptr<TimerTask>) {}
+
 bool operator<(const intrusive_ptr<TimerTask>& a,
                        const intrusive_ptr<TimerTask>& b)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/Timer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/Timer.h?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/Timer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/Timer.h Fri Jan 29 22:59:09 2010
@@ -38,10 +38,11 @@
 class Timer;
 
 class TimerTask : public RefCounted {
-    friend class Timer;
-    friend bool operator<(const boost::intrusive_ptr<TimerTask>&,
-                const boost::intrusive_ptr<TimerTask>&);
+  friend class Timer;
+  friend bool operator<(const boost::intrusive_ptr<TimerTask>&,
+                        const boost::intrusive_ptr<TimerTask>&);
 
+    std::string name;
     AbsTime sortTime;
     Duration period;
     AbsTime nextFireTime;
@@ -51,30 +52,26 @@
     bool readyToFire() const;
     void fireTask();
 
-public:
-    QPID_COMMON_EXTERN TimerTask(Duration period);
-    QPID_COMMON_EXTERN TimerTask(AbsTime fireTime);
+  public:
+    QPID_COMMON_EXTERN TimerTask(Duration period, const std::string& name=std::string());
+    QPID_COMMON_EXTERN TimerTask(AbsTime fireTime, const std::string& name=std::string());
     QPID_COMMON_EXTERN virtual ~TimerTask();
 
     QPID_COMMON_EXTERN void setupNextFire();
     QPID_COMMON_EXTERN void restart();
     QPID_COMMON_EXTERN void cancel();
 
-protected:
+    std::string getName() const { return name; }
+
+  protected:
     // Must be overridden with callback
     virtual void fire() = 0;
 };
 
 // For the priority_queue order
 bool operator<(const boost::intrusive_ptr<TimerTask>& a,
-                const boost::intrusive_ptr<TimerTask>& b);
-
-/**
-   A timer to trigger tasks that are local to one broker.
+               const boost::intrusive_ptr<TimerTask>& b);
 
-   For periodic tasks that should be synchronized across all brokers
-   in a cluster, use qpid::sys::PeriodicTimer.
- */
 class Timer : private Runnable {
     qpid::sys::Monitor monitor;
     std::priority_queue<boost::intrusive_ptr<TimerTask> > tasks;
@@ -84,13 +81,17 @@
     // Runnable interface
     void run();
 
-public:
+  public:
     QPID_COMMON_EXTERN Timer();
-    QPID_COMMON_EXTERN ~Timer();
+    QPID_COMMON_EXTERN virtual ~Timer();
 
-    QPID_COMMON_EXTERN void add(boost::intrusive_ptr<TimerTask> task);
-    QPID_COMMON_EXTERN void start();
-    QPID_COMMON_EXTERN void stop();
+    QPID_COMMON_EXTERN virtual void add(boost::intrusive_ptr<TimerTask> task);
+    QPID_COMMON_EXTERN virtual void start();
+    QPID_COMMON_EXTERN virtual void stop();
+
+  protected:
+    QPID_COMMON_EXTERN virtual void fire(boost::intrusive_ptr<TimerTask> task);
+    QPID_COMMON_EXTERN virtual void drop(boost::intrusive_ptr<TimerTask> task);
 };
 
 

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=904656&r1=904655&r2=904656&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Fri Jan 29 22:59:09 2010
@@ -95,8 +95,12 @@
       <field name="frame-seq" type="sequence-no"/>
     </control>
 
-    <!-- Synchronize periodic timer tasks across the cluster -->
-    <control name="periodic-timer" code="0x15">
+    <!-- Synchronize timer tasks across the cluster -->
+    <control name="timer-wakeup" code="0x15">
+      <field name="name" type="str16"/>
+    </control>
+
+    <control name="timer-drop" code="0x16">
       <field name="name" type="str16"/>
     </control>
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org