You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/07/24 15:33:53 UTC

svn commit: r1365046 - in /qpid/branches/0.18/qpid/cpp/src/qpid: broker/Consumer.h ha/HaBroker.cpp ha/HaPlugin.cpp ha/Primary.cpp ha/QueueGuard.cpp ha/QueueGuard.h ha/ReplicatingSubscription.cpp

Author: aconway
Date: Tue Jul 24 13:33:52 2012
New Revision: 1365046

URL: http://svn.apache.org/viewvc?rev=1365046&view=rev
Log:
NO-JIRA: Fix typos, update comments, update log messages.

Modified:
    qpid/branches/0.18/qpid/cpp/src/qpid/broker/Consumer.h
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaPlugin.cpp
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.cpp
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.h
    qpid/branches/0.18/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/broker/Consumer.h?rev=1365046&r1=1365045&r2=1365046&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/broker/Consumer.h Tue Jul 24 13:33:52 2012
@@ -54,7 +54,9 @@ class Consumer
     bool preAcquires() const { return acquires; }
     const std::string& getName() const { return name; }
 
+    /**@return the position of the last message seen by this consumer */
     virtual framing::SequenceNumber getPosition() const  { return position; }
+
     virtual void setPosition(framing::SequenceNumber pos) { position = pos; }
 
     virtual bool deliver(QueuedMessage& msg) = 0;

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1365046&r1=1365045&r2=1365046&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaBroker.cpp Tue Jul 24 13:33:52 2012
@@ -71,6 +71,7 @@ HaBroker::HaBroker(broker::Broker& b, co
     // otherwise there's a window for a client to connect before we get to
     // initialize()
     if (settings.cluster) {
+        QPID_LOG(debug, logPrefix << "Rejecting client connections.");
         observer->setObserver(boost::shared_ptr<broker::ConnectionObserver>(
                               new BackupConnectionExcluder));
         broker.getConnectionObservers().add(observer);

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1365046&r1=1365045&r2=1365046&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/HaPlugin.cpp Tue Jul 24 13:33:52 2012
@@ -66,7 +66,7 @@ struct HaPlugin : public Plugin {
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
         if (broker) {
             // Must create the HaBroker in earlyInitialize so it can set up its
-            // connection observer before clients start conneting.
+            // connection observer before clients start connecting.
             haBroker.reset(new ha::HaBroker(*broker, settings));
             broker->addFinalizer(boost::bind(&HaPlugin::finalize, this));
         }

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1365046&r1=1365045&r2=1365046&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/Primary.cpp Tue Jul 24 13:33:52 2012
@@ -129,11 +129,15 @@ void Primary::checkReady(Mutex::ScopedLo
 void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l)  {
     if (i != backups.end() && i->second->isReady()) {
         BrokerInfo info = i->second->getBrokerInfo();
-        QPID_LOG(info, "Expected backup is ready: " << info);
         info.setStatus(READY);
+        QPID_LOG(info, "Expected backup is ready: " << info);
         haBroker.addBroker(info);
-        expectedBackups.erase(i->second);
-        checkReady(l);
+        if (expectedBackups.erase(i->second)) {
+            QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
+            checkReady(l);
+        }
+        else
+            QPID_LOG(info, logPrefix << "Backup is ready: " << info);
     }
 }
 
@@ -147,7 +151,7 @@ void Primary::timeoutExpectedBackups() {
         boost::shared_ptr<RemoteBackup> rb = *i;
         if (!rb->isConnected()) {
             BrokerInfo info = rb->getBrokerInfo();
-            QPID_LOG(error, "Expected backup timed out: " << info);
+            QPID_LOG(error, logPrefix << "Expected backup timed out: " << info);
             expectedBackups.erase(i++);
             backups.erase(info.getSystemId());
             rb->cancel();

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1365046&r1=1365045&r2=1365046&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.cpp Tue Jul 24 13:33:52 2012
@@ -101,7 +101,7 @@ void QueueGuard::cancel() {
 
 void QueueGuard::attach(ReplicatingSubscription& rs) {
     Mutex::ScopedLock l(lock);
-     subscription = &rs;
+    subscription = &rs;
 }
 
 namespace {

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1365046&r1=1365045&r2=1365046&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/QueueGuard.h Tue Jul 24 13:33:52 2012
@@ -79,13 +79,20 @@ class QueueGuard {
     void attach(ReplicatingSubscription&);
 
     /**
-     * Return the queue range at the time the QueueGuard was created.  The
-     * QueueGuard is created before the queue becomes active: either when a
-     * backup is promoted, or when a new queue is created on the primary.
+     * Return the un-guarded queue range at the time the QueueGuard was created.
+     *
+     * The first position guaranteed to be protected by the guard is
+     * getRange().getBack()+1. It is possible that the guard has protected some
+     * messages before that point. Any such messages are dealt with in subscriptionStart
+     *
+     * The QueueGuard is created in 3 situations
+     * - when a backup is promoted, guards are created for expected backups.
+     * - when a new queue is created on the primary
+     * - when a new backup joins.
+     *
+     * In the last situation the queue is active while the guard is being
+     * created.
      *
-     * NOTE: The first position guaranteed to be protected by the guard is
-     * getRange().getBack()+1. It is possible that the guard has protected
-     * some messages before that point.
      */
     const QueueRange& getRange() const { return range; } // range is immutable, no lock needed.
 

Modified: qpid/branches/0.18/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.18/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1365046&r1=1365045&r2=1365046&view=diff
==============================================================================
--- qpid/branches/0.18/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/0.18/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Tue Jul 24 13:33:52 2012
@@ -175,13 +175,13 @@ ReplicatingSubscription::ReplicatingSubs
         logPrefix = os.str();
 
         // NOTE: Once the guard is attached we can have concurrent
-        // calls to dequeued so we need to lock use of this->deques.
+        // calls to dequeued so we need to lock use of this->dequeues.
         //
         // However we must attach the guard _before_ we scan for
         // initial dequeues to be sure we don't miss any dequeues
         // between the scan and attaching the guard.
         if (Primary::get()) guard = Primary::get()->getGuard(queue, info);
-        if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo()));
+        if (!guard) guard.reset(new QueueGuard(*queue, info));
         guard->attach(*this);
 
         QueueRange backup(arguments); // The remote backup state.
@@ -213,6 +213,9 @@ ReplicatingSubscription::ReplicatingSubs
             scan.finish();
             position = backup.back;
         }
+        // NOTE: we are assuming that the messages that are on the backup are
+        // consistent with those on the primary. If the backup is a replica
+        // queue and hasn't been tampered with then that will be the case.
 
         QPID_LOG(debug, logPrefix << "Subscribed:"
                  << " backup:" << backup
@@ -332,7 +335,7 @@ void ReplicatingSubscription::sendDequeu
 void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
 {
     assert (qm.queue == getQueue().get());
-     QPID_LOG(trace, logPrefix << "Dequeued " << qm);
+    QPID_LOG(trace, logPrefix << "Dequeued " << qm);
     {
         Mutex::ScopedLock l(lock);
         dequeues.add(qm.position);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org