You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/12/14 22:45:52 UTC

svn commit: r1214490 - in /qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha: QueueReplicator.cpp ReplicatingSubscription.cpp ReplicatingSubscription.h

Author: aconway
Date: Wed Dec 14 21:45:52 2011
New Revision: 1214490

URL: http://svn.apache.org/viewvc?rev=1214490&view=rev
Log:
QPID-3603: Failover optimization removed.

There was an optimization to re-use messages already on the backup
after fail-over. This optimization was removed to simplify the logic
while we basic replication working. It can be re-introduced
later. Last revision with the optimization was:

r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.

Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1214490&r1=1214489&r2=1214490&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Wed Dec 14 21:45:52 2011
@@ -49,7 +49,6 @@ QueueReplicator::QueueReplicator(boost::
     : Exchange(QPID_REPLICATOR_+q->getName(), 0, 0), // FIXME aconway 2011-11-24: hidden from management?
       queue(q), link(l), current(queue->getPosition())
 {
-    // FIXME aconway 2011-11-24: consistent logging.
     QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings());
     // Declare the replicator bridge.
     queue->getBroker()->getLinks().declare(
@@ -77,12 +76,20 @@ void QueueReplicator::initializeBridge(B
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
     framing::FieldTable settings;
+
+    // FIXME aconway 2011-12-09: Failover optimization removed.
+    // There was code here to re-use messages already on the backup
+    // during fail-over. This optimization was removed to simplify
+    // the logic till we get the basic replication stable, it
+    // can be re-introduced later. Last revision with the optimization:
+    // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
+
+    // Clear out any old messages, reset the queue to start replicating fresh.
+    queue->purge();
+    queue->setPosition(0);
+
     settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
-    settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition());
     settings.setInt(QPID_SYNC_FREQUENCY, 1);
-    qpid::framing::SequenceNumber oldest;
-    if (queue->getOldest(oldest))
-        settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, oldest);
     peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false, "", 0, settings);
     peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
     peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1214490&r1=1214489&r2=1214490&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Wed Dec 14 21:45:52 2011
@@ -34,44 +34,12 @@ using namespace broker;
 using namespace std;
 
 const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
-const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number");
-const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number");
 
 namespace {
 const string DOLLAR("$");
 const string INTERNAL("-internal");
 } // namespace
 
-class ReplicationStateInitialiser
-{
-  public:
-    ReplicationStateInitialiser(
-        qpid::framing::SequenceSet& r,
-        const qpid::framing::SequenceNumber& s,
-        const qpid::framing::SequenceNumber& e) : dequeues(r), start(s), end(e)
-    {
-        dequeues.add(start, end);
-    }
-
-    void operator()(const QueuedMessage& message) {
-        if (message.position < start) {
-            //replica does not have a message that should still be on the queue
-            QPID_LOG(warning, "HA: Replica missing message " << QueuePos(message));
-            // FIXME aconway 2011-12-09: we want the replica to dump
-            // its messages and start from scratch in this case.
-        } else if (message.position >= start && message.position <= end) {
-            //i.e. message is within the intial range and has not been dequeued,
-            //so remove it from the dequeues
-            dequeues.remove(message.position);
-        } //else message has not been seen by replica yet so can be ignored here
-    }
-
-  private:
-    qpid::framing::SequenceSet& dequeues;
-    const qpid::framing::SequenceNumber start;
-    const qpid::framing::SequenceNumber end;
-};
-
 string mask(const string& in)
 {
     return DOLLAR + in + INTERNAL;
@@ -97,7 +65,6 @@ ReplicatingSubscription::Factory::create
         rs.reset(new ReplicatingSubscription(
                      parent, name, queue, ack, false, exclusive, tag,
                      resumeId, resumeTtl, arguments));
-        // FIXME aconway 2011-12-08: need to removeObserver also.
         queue->addObserver(rs);
     }
     return rs;
@@ -119,44 +86,23 @@ ReplicatingSubscription::ReplicatingSubs
     events(new Queue(mask(name))),
     consumer(new DelegatingConsumer(*this))
 {
-    // FIXME aconway 2011-12-09: Here we take advantage of existing
-    // messages on the backup queue to reduce replication
-    // effort. However if the backup queue is inconsistent with being
-    // a backup of the primary queue, then we want to issue a warning
-    // and tell the backup to dump its messages and start replicating
-    // from scratch.
+    // FIXME aconway 2011-12-09: Failover optimization removed.
+    // There was code here to re-use messages already on the backup
+    // during fail-over. This optimization was removed to simplify
+    // the logic till we get the basic replication stable, it
+    // can be re-introduced later. Last revision with the optimization:
+    // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
+
     QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName());
-    if (arguments.isSet(QPID_HIGH_SEQUENCE_NUMBER)) {
-        qpid::framing::SequenceNumber hwm = arguments.getAsInt(QPID_HIGH_SEQUENCE_NUMBER);
-        qpid::framing::SequenceNumber lwm;
-        if (arguments.isSet(QPID_LOW_SEQUENCE_NUMBER)) {
-            lwm = arguments.getAsInt(QPID_LOW_SEQUENCE_NUMBER);
-        } else {
-            lwm = hwm;
-        }
-        qpid::framing::SequenceNumber oldest;
-        if (queue->getOldest(oldest)) {
-            if (oldest >= hwm) {
-                dequeues.add(lwm, --oldest);
-            } else if (oldest >= lwm) {
-                ReplicationStateInitialiser initialiser(dequeues, lwm, hwm);
-                queue->eachMessage(initialiser);
-            } else { //i.e. older message on master than is reported to exist on replica
-                // FIXME aconway 2011-12-09: dump and start from scratch?
-                QPID_LOG(warning, "HA: Replica missing message on primary");
-            }
-        } else {
-            //local queue (i.e. master) is empty
-            dequeues.add(lwm, queue->getPosition());
-            // FIXME aconway 2011-12-09: if hwm >
-            // queue->getPosition(), dump and start from scratch?
-        }
-        QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << ": "
-                 << dequeues << " (lwm=" << lwm << ", hwm=" << hwm
-                 << ", current=" << queue->getPosition() << ")");
-        //set position of 'cursor'
-        position = hwm;
-    }
+    qpid::framing::SequenceNumber oldest;
+    if (queue->getOldest(oldest)) 
+        dequeues.add(0, --oldest);
+    else //local queue (i.e. master) is empty
+        dequeues.add(0, queue->getPosition());
+
+    QPID_LOG(debug, "HA: Initial dequeues for " << queue->getName() << ": " << dequeues);
+    // Set 'cursor' on backup queue. Will be updated by dequeue event sent above.
+    position = 0;
 }
 
 bool ReplicatingSubscription::deliver(QueuedMessage& m)

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1214490&r1=1214489&r2=1214490&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Wed Dec 14 21:45:52 2011
@@ -61,8 +61,6 @@ class ReplicatingSubscription : public b
 
     // Argument names for consume command.
     static const std::string QPID_REPLICATING_SUBSCRIPTION;
-    static const std::string QPID_HIGH_SEQUENCE_NUMBER;
-    static const std::string QPID_LOW_SEQUENCE_NUMBER;
 
     ReplicatingSubscription(broker::SemanticState* parent,
                             const std::string& name, boost::shared_ptr<broker::Queue> ,



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org