You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/01/27 23:21:29 UTC

svn commit: r903869 - in /qpid/trunk/qpid/cpp: include/qpid/sys/ src/ src/qpid/broker/ src/qpid/cluster/

Author: aconway
Date: Wed Jan 27 22:21:28 2010
New Revision: 903869

URL: http://svn.apache.org/viewvc?rev=903869&view=rev
Log:
QPID_2634  Management updates in timer create inconsistencies in a cluster.

Cluster plugin provides a PeriodicTimer implementation to the broker
which executes tasks in the cluster dispatch thread simultaneously
across the cluster.

Added:
    qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h
Modified:
    qpid/trunk/qpid/cpp/include/qpid/sys/Time.h
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp

Modified: qpid/trunk/qpid/cpp/include/qpid/sys/Time.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/sys/Time.h?rev=903869&r1=903868&r2=903869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/sys/Time.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/sys/Time.h Wed Jan 27 22:21:28 2010
@@ -122,7 +122,7 @@
     friend class AbsTime;
 
 public:
-    QPID_COMMON_EXTERN inline Duration(int64_t time0);
+    QPID_COMMON_EXTERN inline Duration(int64_t time0 = 0);
     QPID_COMMON_EXTERN explicit Duration(const AbsTime& time0);
     QPID_COMMON_EXTERN explicit Duration(const AbsTime& start, const AbsTime& finish);
     inline operator int64_t() const;

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=903869&r1=903868&r2=903869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Wed Jan 27 22:21:28 2010
@@ -733,6 +733,7 @@
      qpid/broker/Connection.cpp
      qpid/broker/ConnectionHandler.cpp
      qpid/broker/ConnectionFactory.cpp
+     qpid/broker/DelegatingPeriodicTimer.cpp
      qpid/broker/DeliverableMessage.cpp
      qpid/broker/DeliveryRecord.cpp
      qpid/broker/DirectExchange.cpp

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=903869&r1=903868&r2=903869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Wed Jan 27 22:21:28 2010
@@ -508,6 +508,8 @@
   qpid/broker/Consumer.h \
   qpid/broker/Daemon.cpp \
   qpid/broker/Daemon.h \
+  qpid/broker/DelegatingPeriodicTimer.h \
+  qpid/broker/DelegatingPeriodicTimer.cpp \
   qpid/broker/Deliverable.h \
   qpid/broker/DeliverableMessage.cpp \
   qpid/broker/DeliverableMessage.h \

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=903869&r1=903868&r2=903869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Jan 27 22:21:28 2010
@@ -137,7 +137,6 @@
 
 Broker::Broker(const Broker::Options& conf) :
     poller(new Poller),
-    periodicTimer(new sys::PeriodicTimerImpl(timer)),
     config(conf),
     managementAgent(conf.enableMgmt ? new ManagementAgent() : 0),
     store(new NullMessageStore),
@@ -258,6 +257,12 @@
     // Initialize plugins
     Plugin::initializeAll(*this);
 
+    if (!periodicTimer.hasDelegate()) {
+        // If no plugin has contributed a PeriodicTimer, use the default one.
+        periodicTimer.setDelegate(
+            std::auto_ptr<sys::PeriodicTimer>(new sys::PeriodicTimerImpl(timer)));
+    }
+
     if (conf.queueCleanInterval) {
         queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC);
     }
@@ -469,6 +474,10 @@
     return knownBrokers;
 }
 
+void Broker::setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt) {
+    periodicTimer.setDelegate(pt);
+}
+
 const std::string Broker::TCP_TRANSPORT("tcp");
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=903869&r1=903868&r2=903869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Jan 27 22:21:28 2010
@@ -25,6 +25,7 @@
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/ConnectionFactory.h"
 #include "qpid/broker/ConnectionToken.h"
+#include "qpid/broker/DelegatingPeriodicTimer.h"
 #include "qpid/broker/DirectExchange.h"
 #include "qpid/broker/DtxManager.h"
 #include "qpid/broker/ExchangeRegistry.h"
@@ -49,7 +50,6 @@
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Timer.h"
-#include "qpid/sys/PeriodicTimer.h"
 #include "qpid/RefCounted.h"
 #include "qpid/broker/AclModule.h"
 #include "qpid/sys/Mutex.h"
@@ -147,7 +147,7 @@
 
     boost::shared_ptr<sys::Poller> poller;
     sys::Timer timer;
-    std::auto_ptr<sys::PeriodicTimer> periodicTimer;
+    DelegatingPeriodicTimer periodicTimer;
     Options config;
     std::auto_ptr<management::ManagementAgent> managementAgent;
     ProtocolFactoryMap protocolFactories;
@@ -255,8 +255,8 @@
     void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
 
     sys::Timer& getTimer() { return timer; }
-    sys::PeriodicTimer& getPeriodicTimer() { return *periodicTimer; }
-    void setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt) { periodicTimer = pt; }
+    sys::PeriodicTimer& getPeriodicTimer() { return periodicTimer; }
+    void setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt);
 
     boost::function<std::vector<Url> ()> getKnownBrokers;
 

Added: qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp?rev=903869&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp Wed Jan 27 22:21:28 2010
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DelegatingPeriodicTimer.h"
+
+namespace qpid {
+namespace broker {
+
+DelegatingPeriodicTimer::DelegatingPeriodicTimer() {}
+
+void DelegatingPeriodicTimer::add(
+    const Task& task, sys::Duration period, const std::string& taskName)
+{
+    if (delegate.get())
+        delegate->add(task, period, taskName);
+    else {
+        Entry e;
+        e.task = task;
+        e.period = period;
+        e.name = taskName;
+        entries.push_back(e);
+    }
+}
+
+void DelegatingPeriodicTimer::setDelegate(std::auto_ptr<PeriodicTimer> impl) {
+    assert(impl.get());
+    assert(!delegate.get());
+    delegate = impl;
+    for (Entries::iterator i = entries.begin(); i != entries.end(); ++i)
+        delegate->add(i->task, i->period, i->name);
+}
+
+}} // namespace qpid::broker

Added: qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h?rev=903869&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DelegatingPeriodicTimer.h Wed Jan 27 22:21:28 2010
@@ -0,0 +1,57 @@
+#ifndef QPID_BROKER_PERIODICTIMER_H
+#define QPID_BROKER_PERIODICTIMER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/PeriodicTimer.h"
+#include <vector>
+#include <memory>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * A PeriodicTimer implementation that delegates to another PeriodicTimer.
+ *
+ * Tasks added while there is no delegate timer are stored.
+ * When a delgate timer is set, stored tasks are added to it.
+ */
+class DelegatingPeriodicTimer : public sys::PeriodicTimer
+{
+  public:
+    DelegatingPeriodicTimer();
+    /** Add a task: if no delegate, store it. When delegate set, add stored tasks */
+    void add(const Task& task, sys::Duration period, const std::string& taskName);
+    /** Set the delegate, transfers ownership of delegate. */
+    void setDelegate(std::auto_ptr<PeriodicTimer> delegate);
+    bool hasDelegate() { return delegate.get(); }
+  private:
+    struct Entry { Task task; sys::Duration period; std::string name; };
+    typedef std::vector<Entry> Entries;
+    std::auto_ptr<PeriodicTimer> delegate;
+    Entries entries;
+
+};
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_PERIODICTIMER_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=903869&r1=903868&r2=903869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jan 27 22:21:28 2010
@@ -250,9 +250,10 @@
     updateRetracted(false),
     error(*this)
 {
-    // FIXME aconway 2010-01-26: must be done before management registers with timer.
-    broker.setPeriodicTimer(
-        std::auto_ptr<sys::PeriodicTimer>(new PeriodicTimerImpl(*this)));
+    // We give ownership of the timer to the broker and keep a plain pointer.
+    // This is OK as it means the timer has the same lifetime as the broker.
+    timer = new PeriodicTimerImpl(*this);
+    broker.setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer>(timer));
 
     mAgent = broker.getManagementAgent();
     if (mAgent != 0){
@@ -448,8 +449,8 @@
 // Handler for deliverFrameQueue.
 // This thread executes the main logic.
 void Cluster::deliveredFrame(const EventFrame& efConst) {
-    sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
     Mutex::ScopedLock l(lock);
+    sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
     if (state == LEFT) return;
     EventFrame e(efConst);
     const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody());
@@ -961,13 +962,13 @@
         error.respondNone(from, type, frameSeq);
 }
 
-void Cluster::periodicTimer(const MemberId&, const std::string& , Lock&) {
-    // FIXME aconway 2010-01-26:
+void Cluster::periodicTimer(const MemberId&, const std::string& name, Lock&) {
+    timer->deliver(name);
 }
 
 bool Cluster::isElder() const {
-    Mutex::ScopedLock l(lock);
-    return elders.empty();
+    Monitor::ScopedLock l(lock);
+    return state >= CATCHUP && elders.empty();
 }
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=903869&r1=903868&r2=903869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Jan 27 22:21:28 2010
@@ -63,6 +63,7 @@
 
 class Connection;
 class EventFrame;
+class PeriodicTimerImpl;
 
 /**
  * Connection to the cluster
@@ -271,6 +272,7 @@
     bool updateRetracted;
     ErrorCheck error;
     UpdateReceiver updateReceiver;
+    PeriodicTimerImpl* timer;
 
   friend std::ostream& operator<<(std::ostream&, const Cluster&);
   friend class ClusterDispatcher;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp?rev=903869&r1=903868&r2=903869&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PeriodicTimerImpl.cpp Wed Jan 27 22:21:28 2010
@@ -32,20 +32,24 @@
     Cluster& c, const Task& t, sys::Duration d, const std::string& n)
     : TimerTask(d), cluster(c), timer(c.getBroker().getTimer()),
       task(t), name(n), inFlight(false)
-{}
+{
+    timer.add(this);
+}
 
 void PeriodicTimerImpl::TaskEntry::fire() {
+    setupNextFire();
+    timer.add(this);
+    bool isElder = cluster.isElder(); // Call outside lock to avoid deadlock.
     sys::Mutex::ScopedLock l(lock);
     // Only the elder mcasts.
     // Don't mcast another if we haven't yet received the last one.
-    if (cluster.isElder() && !inFlight) {
+    if (isElder && !inFlight) {
+        QPID_LOG(trace, "Sending periodic-timer control for " << name);
         inFlight = true;
         cluster.getMulticast().mcastControl(
             framing::ClusterPeriodicTimerBody(framing::ProtocolVersion(), name),
             cluster.getId());
     }
-    setupNextFire();
-    timer.add(this);
 }
 
 void PeriodicTimerImpl::TaskEntry::deliver() {
@@ -59,6 +63,7 @@
     const Task& task, sys::Duration period, const std::string& name)
 {
     sys::Mutex::ScopedLock l(lock);
+    QPID_LOG(debug, "Periodic timer add entry for " << name);
     if (map.find(name) != map.end())
         throw Exception(QPID_MSG("Cluster timer task name added twice: " << name));
     map[name] = new TaskEntry(cluster, task, period, name);
@@ -72,6 +77,7 @@
         if (i == map.end())
             throw Exception(QPID_MSG("Cluster timer unknown task: " << name));
     }
+    QPID_LOG(debug, "Periodic timer execute " << name);
     i->second->deliver();
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org