You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/12/02 22:03:39 UTC

svn commit: r1209691 - in /qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha: Backup.cpp HaBroker.cpp HaPlugin.cpp QueueReplicator.cpp ReplicatingSubscription.cpp WiringReplicator.cpp

Author: aconway
Date: Fri Dec  2 21:03:38 2011
New Revision: 1209691

URL: http://svn.apache.org/viewvc?rev=1209691&view=rev
Log:
QPID-3603: Cleaned up HA log messages.

Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaPlugin.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1209691&r1=1209690&r2=1209691&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/Backup.cpp Fri Dec  2 21:03:38 2011
@@ -43,23 +43,20 @@ using types::Variant;
 using std::string;
 
 Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
-    // FIXME aconway 2011-11-24: identifying the primary.
-    if (s.brokerUrl != "primary") { // FIXME aconway 2011-11-22: temporary hack to identify primary.
-        Url url(s.brokerUrl);
-        QPID_LOG(info, "HA: Acting as backup");
-        string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+    Url url(s.brokerUrl);
+    string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
 
-        // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over.
-        // Declare the link
-        std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
-            url[0].host, url[0].port, protocol,
-            false,              // durable
-            s.mechanism, s.username, s.password);
-        assert(result.second);  // FIXME aconway 2011-11-23: error handling
-        link = result.first;
-        boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
-        broker.getExchanges().registerExchange(wr);
-    }
+    // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over.
+    // Declare the link
+    std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
+        url[0].host, url[0].port, protocol,
+        false,              // durable
+        s.mechanism, s.username, s.password);
+    assert(result.second);  // FIXME aconway 2011-11-23: error handling
+    link = result.first;
+    boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
+    broker.getExchanges().registerExchange(wr);
 }
 
+
 }} // namespace qpid::ha

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1209691&r1=1209690&r2=1209691&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Dec  2 21:03:38 2011
@@ -61,7 +61,9 @@ HaBroker::HaBroker(broker::Broker& b, co
     }
     QPID_LOG(notice, "HA: Initialized: client-url=" << clientUrl
              << " broker-url=" << brokerUrl);
-    backup.reset(new Backup(broker, s));
+    // FIXME aconway 2011-11-22: temporary hack to identify primary.
+    if (s.brokerUrl != "primary")
+        backup.reset(new Backup(broker, s));
     // Register a factory for replicating subscriptions.
     broker.getConsumerFactories().add(
         boost::shared_ptr<ReplicatingSubscription::Factory>(

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1209691&r1=1209690&r2=1209691&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/HaPlugin.cpp Fri Dec  2 21:03:38 2011
@@ -58,7 +58,7 @@ struct HaPlugin : public Plugin {
         if (broker && settings.enabled) {
             haBroker.reset(new ha::HaBroker(*broker, settings));
         } else
-            QPID_LOG(info, "HA: Disabled");
+            QPID_LOG(notice, "HA: Disabled");
     }
 };
 

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1209691&r1=1209690&r2=1209691&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Dec  2 21:03:38 2011
@@ -42,7 +42,6 @@ namespace qpid {
 namespace ha {
 using namespace broker;
 
-// FIXME aconway 2011-12-02: separate file for string constantS?
 const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event");
 
 QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
@@ -85,7 +84,7 @@ void QueueReplicator::initializeBridge(B
     peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, settings);
     peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
     peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
-    QPID_LOG(debug, "HA: Activated route from queue " << args.i_src << " to " << args.i_dest);
+    QPID_LOG(debug, "HA: Backup activated bridge from queue " << args.i_src << " to " << args.i_dest);
 }
 
 void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/)
@@ -102,18 +101,15 @@ void QueueReplicator::route(Deliverable&
             if (current < *i) {
                 //haven't got that far yet, record the dequeue
                 dequeued.add(*i);
-                QPID_LOG(trace, "HA: Recording dequeue of message at " <<
-                         QueuePos(queue.get(), *i));
+                QPID_LOG(trace, "HA: Recording dequeue of " << QueuePos(queue.get(), *i));
             } else {
                 QueuedMessage message;
                 if (queue->acquireMessageAt(*i, message)) {
                     queue->dequeue(0, message);
-                    QPID_LOG(info, "HA: Dequeued message "<< QueuePos(message));
+                    QPID_LOG(trace, "HA: Backup dequeued: "<< QueuePos(message));
                 } else {
-                    // FIXME aconway 2011-11-29: error handling
-                    // Is this an error? Will happen if queue has initial dequeues.
-                    QPID_LOG(error, "HA: Unable to dequeue message at "
-                             << QueuePos(queue.get(), *i));
+                    // This can happen if we're replicating a queue that has initial dequeues.
+                    QPID_LOG(trace, "HA: Backup message already dequeued: "<< QueuePos(queue.get(), *i));
                 }
             }
         }
@@ -122,10 +118,10 @@ void QueueReplicator::route(Deliverable&
         //dequeued before our subscription reached them
         while (dequeued.contains(++current)) {
             dequeued.remove(current);
-            QPID_LOG(debug, "HA: Skipping dequeued message at " << current << " from " << queue->getName());
+            QPID_LOG(trace, "HA: Backup skipping dequeued message: " << QueuePos(queue.get(), current));
             queue->setPosition(current);
         }
-        QPID_LOG(info, "HA: Enqueued message on " << queue->getName() << "; currently at " << current);
+        QPID_LOG(trace, "HA: Backup enqueued message: " << QueuePos(queue.get(), current));
         msg.deliverTo(queue);
     }
 }

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1209691&r1=1209690&r2=1209691&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Dec  2 21:03:38 2011
@@ -33,8 +33,6 @@ using namespace framing;
 using namespace broker;
 using namespace std;
 
-// FIXME aconway 2011-11-28: review all arugment names, prefixes etc.
-// Do we want a common HA prefix?
 const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
 const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number");
 const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number");
@@ -211,7 +209,7 @@ void ReplicatingSubscription::dequeued(c
     {
         sys::Mutex::ScopedLock l(lock);
         range.add(m.position);
-        // FIXME aconway 2011-11-29: q[pos]
+        // FIXME aconway 2011-11-29: q[pos] logging
         QPID_LOG(trace, "HA: Updated dequeue event to include " << QueuePos(m) << "; subscription is at " << position);
     }
     notify();

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1209691&r1=1209690&r2=1209691&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Fri Dec  2 21:03:38 2011
@@ -115,7 +115,6 @@ const string S_WIRING="wiring";
 const string S_ALL="all";
 
 ReplicateLevel replicateLevel(const string& str) {
-    // FIXME aconway 2011-11-24: case insenstive comparison.
     ReplicateLevel rl = RL_NONE;
     if (str == S_WIRING) rl = RL_WIRING;
     else if (str == S_ALL) rl = RL_ALL;
@@ -176,7 +175,7 @@ WiringReplicator::~WiringReplicator() {}
 WiringReplicator::WiringReplicator(const boost::shared_ptr<Link>& l)
     : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l)
 {
-    QPID_LOG(debug, "HA: Starting replication from " <<
+    QPID_LOG(info, "HA: Backup replicating from " <<
              link->getTransport() << ":" << link->getHost() << ":" << link->getPort());
     broker.getLinks().declare(
         link->getHost(), link->getPort(),
@@ -212,10 +211,10 @@ void WiringReplicator::initializeBridge(
     sendQuery(QUEUE, queueName, sessionHandler);
     sendQuery(EXCHANGE, queueName, sessionHandler);
     sendQuery(BINDING, queueName, sessionHandler);
-    QPID_LOG(debug, "HA: Activated wiring replicator")
+    QPID_LOG(debug, "HA: Backup activated wiring bridge: " << queueName);
 }
 
-// FIXME aconway 2011-12-02: error  handling in route. Be forging but log warnings?
+// FIXME aconway 2011-12-02: error handling in route.
 void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
     Variant::List list;
     try {
@@ -230,7 +229,8 @@ void WiringReplicator::route(Deliverable
                 Variant::Map& map = i->asMap();
                 Variant::Map& schema = map[SCHEMA_ID].asMap();
                 Variant::Map& values = map[VALUES].asMap();
-                QPID_LOG(trace, "HA: Configuration event: schema=" << schema << " values=" << values);
+                QPID_LOG(debug, "HA: Backup received event: schema=" << schema
+                         << " values=" << values);
                 if      (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
                 else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
                 else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
@@ -238,7 +238,9 @@ void WiringReplicator::route(Deliverable
                 else if (match<EventBind>(schema)) doEventBind(values);
                 // FIXME aconway 2011-11-21: handle unbind & all other events.
                 else if (match<EventSubscribe>(schema)) {} // Deliberately ignored.
-                else throw(Exception(QPID_MSG("WiringReplicator received unexpected event, schema=" << schema)));
+                // FIXME aconway 2011-12-02: error handling
+                else throw(Exception(QPID_MSG("Backup received unexpected event, schema="
+                                              << schema)));
             }
         } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
@@ -246,18 +248,20 @@ void WiringReplicator::route(Deliverable
                 Variant::Map& values = i->asMap()[VALUES].asMap();
                 framing::FieldTable args;
                 amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
-                QPID_LOG(trace, "HA: Configuration response type=" << type << " values=" << values);
+                QPID_LOG(trace, "HA: Backup received response type=" << type
+                         << " values=" << values);
                 if      (type == QUEUE) doResponseQueue(values);
                 else if (type == EXCHANGE) doResponseExchange(values);
                 else if (type == BINDING) doResponseBind(values);
                 else throw Exception(QPID_MSG("HA: Unexpected response type: " << type));
             }
         } else {
-            QPID_LOG(warning, QPID_MSG("HA: Expecting remote configuration message, got: " << *headers));
+            QPID_LOG(error, QPID_MSG("HA: Backup received unexpected message: "
+                                       << *headers));
         }
     } catch (const std::exception& e) {
-        QPID_LOG(warning, "HA: Error replicating configuration: " << e.what());
-        QPID_LOG(debug, "HA: Error processing configuration message: " << list);
+        QPID_LOG(error, "HA: Backup replication error: " << e.what());
+        QPID_LOG(error, "HA: Backup replication error while processing: " << list);
     }
 }
 
@@ -267,9 +271,7 @@ void WiringReplicator::doEventQueueDecla
     if (values[DISP] == CREATED && replicateLevel(argsMap)) {
          framing::FieldTable args;
         amqp_0_10::translate(argsMap, args);
-
-        QPID_LOG(debug, "HA: Creating queue from event " << name);
-       std::pair<boost::shared_ptr<Queue>, bool> result =
+        std::pair<boost::shared_ptr<Queue>, bool> result =
             broker.createQueue(
                 name,
                 values[DURABLE].asBool(),
@@ -284,10 +286,11 @@ void WiringReplicator::doEventQueueDecla
             // re-create from event.
             // Events are always up to date, whereas responses may be
             // out of date.
-            QPID_LOG(debug, "HA: New queue replica " << name);
+            QPID_LOG(debug, "HA: Created backup queue from event: " << name);
             startQueueReplicator(result.first);
         } else {
-            QPID_LOG(warning, "HA: Replicated queue " << name << " already exists");
+            // FIXME aconway 2011-12-02: what's the right way to handle this?
+            QPID_LOG(warning, "HA: Queue already exists on backup: " << name);
         }
     }
 }
@@ -296,7 +299,7 @@ void WiringReplicator::doEventQueueDelet
     string name = values[QNAME].asString();
     boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
     if (queue && replicateLevel(queue->getSettings())) {
-        QPID_LOG(debug, "HA: Deleting queue from event: " << name);
+        QPID_LOG(debug, "HA: Deleting backup queue from event: " << name);
         broker.deleteQueue(
             name,
             values[USER].asString(),
@@ -310,18 +313,20 @@ void WiringReplicator::doEventExchangeDe
         string name = values[EXNAME].asString();
         framing::FieldTable args;
         amqp_0_10::translate(argsMap, args);
-        QPID_LOG(debug, "HA: New exchange replica " << name);
-        if (!broker.createExchange(
+        if (broker.createExchange(
                 name,
                 values[EXTYPE].asString(),
                 values[DURABLE].asBool(),
                 values[ALTEX].asString(),
                 args,
                 values[USER].asString(),
-                values[RHOST].asString()).second) {
+                values[RHOST].asString()).second)
+        {
+                    QPID_LOG(debug, "HA: created backup exchange from event: " << name);
+        } else {
             // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
             // and re-create from event. See comment in doEventQueueDeclare.
-            QPID_LOG(warning, "HA: Replicated exchange " << name << " already exists");
+            QPID_LOG(warning, "HA: Exchange already exists on backup: " << name);
         }
     }
 }
@@ -331,7 +336,7 @@ void WiringReplicator::doEventExchangeDe
     try {
         boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
         if (exchange && replicateLevel(exchange->getArgs())) {
-            QPID_LOG(debug, "HA: Deleting exchange:" << name);
+            QPID_LOG(debug, "HA: Deleting backup exchange:" << name);
             broker.deleteExchange(
                 name,
                 values[USER].asString(),
@@ -378,12 +383,12 @@ void WiringReplicator::doResponseQueue(V
             ""/*TODO: who is the user?*/,
             ""/*TODO: what should we use as connection id?*/);
     if (result.second) {
-        QPID_LOG(debug, "HA: New queue replica: " << values[NAME] << " (in catch-up)");
+        QPID_LOG(debug, "HA: Created backup queue from response: " << values[NAME]);
         startQueueReplicator(result.first);
     } else {
         // FIXME aconway 2011-11-22: Normal to find queue already
         // exists if we're failing over.
-        QPID_LOG(warning, "HA: Replicated queue " << values[NAME] << " already exists (in catch-up)");
+        QPID_LOG(warning, "HA: Queue already exists on backup: " << name);
     }
 }
 
@@ -392,16 +397,18 @@ void WiringReplicator::doResponseExchang
     if (!replicateLevel(argsMap)) return;
     framing::FieldTable args;
     amqp_0_10::translate(argsMap, args);
-    QPID_LOG(debug, "HA: New exchange replica " << values[NAME] << " (in catch-up)");
-    if (!broker.createExchange(
+    if (broker.createExchange(
             values[NAME].asString(),
             values[TYPE].asString(),
             values[DURABLE].asBool(),
             ""/*TODO: need to include alternate-exchange*/,
             args,
             ""/*TODO: who is the user?*/,
-            ""/*TODO: what should we use as connection id?*/).second) {
-        QPID_LOG(warning, "HA: Replicated exchange " << values[QNAME] << " already exists (in catch-up)");
+            ""/*TODO: what should we use as connection id?*/).second)
+    {
+        QPID_LOG(debug, "HA: Created backup exchange from response: " << values[NAME]);
+    } else {
+        QPID_LOG(warning, "HA: Exchange already exists on backup:  " << values[QNAME]);
     }
 }
 
@@ -440,10 +447,10 @@ void WiringReplicator::doResponseBind(Va
         framing::FieldTable args;
         amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
         string key = values[KEY].asString();
-        QPID_LOG(debug, "HA: Replicated binding exchange=" << exchange->getName()
+        exchange->bind(queue, key, &args);
+        QPID_LOG(debug, "HA: Created backup binding from response: exchange=" << exchange->getName()
                  << " queue=" << queue->getName()
                  << " key=" << key);
-        exchange->bind(queue, key, &args);
     }
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org