You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/08/20 00:34:16 UTC

svn commit: r1619003 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/ha/ tests/

Author: aconway
Date: Tue Aug 19 22:34:15 2014
New Revision: 1619003

URL: http://svn.apache.org/r1619003
Log:
QPID-6020: HA logging improvements - log prefix with status and ID.

Include broker status and ID in (almost) all logging messages.
Makes it much easier to track broker state and interactions.

Sundry other logging improvements including:
- Demote noisy messages to trace - connections from rgmanager status checks, searching for primary.
- Rationalise start-up messages.
- Improved queue state detail replicating subscription and queue guard initialization.
- Fail to prepare TX is error.
- Collect all primary TX errors into one.
- Fix status of catchup brokers in primary membership for logging.
- Add process name/PID info to client connection messages.
- Various minor message tweaks.

Added:
    qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.cpp
      - copied, changed from r1618964, qpid/trunk/qpid/cpp/src/qpid/ha/StandAlone.h
    qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.h
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h
    qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Role.h
    qpid/trunk/qpid/cpp/src/qpid/ha/StandAlone.h
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h
    qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Tue Aug 19 22:34:15 2014
@@ -522,7 +522,6 @@ option(BUILD_HA "Build Active-Passive HA
 
 if (BUILD_HA)
     set (ha_SOURCES
-	qpid/ha/QueueSnapshot.h
         qpid/ha/AlternateExchangeSetter.h
         qpid/ha/Backup.cpp
         qpid/ha/Backup.h
@@ -533,24 +532,29 @@ if (BUILD_HA)
         qpid/ha/BrokerReplicator.h
         qpid/ha/ConnectionObserver.cpp
         qpid/ha/ConnectionObserver.h
-	qpid/ha/Event.cpp
-	qpid/ha/Event.h
+        qpid/ha/Event.cpp
+        qpid/ha/Event.h
         qpid/ha/FailoverExchange.cpp
         qpid/ha/FailoverExchange.h
         qpid/ha/HaBroker.cpp
         qpid/ha/HaBroker.h
         qpid/ha/HaPlugin.cpp
-	qpid/ha/IdSetter.h
-        qpid/ha/QueueSnapshot.h
+        qpid/ha/IdSetter.h
+        qpid/ha/LogPrefix.cpp
+        qpid/ha/LogPrefix.h
         qpid/ha/Membership.cpp
         qpid/ha/Membership.h
         qpid/ha/Primary.cpp
         qpid/ha/Primary.h
         qpid/ha/PrimaryQueueLimits.h
+        qpid/ha/PrimaryTxObserver.cpp
+        qpid/ha/PrimaryTxObserver.h
         qpid/ha/QueueGuard.cpp
         qpid/ha/QueueGuard.h
         qpid/ha/QueueReplicator.cpp
         qpid/ha/QueueReplicator.h
+        qpid/ha/QueueSnapshot.h
+        qpid/ha/QueueSnapshot.h
         qpid/ha/RemoteBackup.cpp
         qpid/ha/RemoteBackup.h
         qpid/ha/ReplicatingSubscription.cpp
@@ -564,11 +568,9 @@ if (BUILD_HA)
         qpid/ha/StatusCheck.h
         qpid/ha/TxReplicatingSubscription.cpp
         qpid/ha/TxReplicatingSubscription.h
-        qpid/ha/PrimaryTxObserver.cpp
-        qpid/ha/PrimaryTxObserver.h
+        qpid/ha/TxReplicator.cpp
+        qpid/ha/TxReplicator.h
         qpid/ha/types.cpp
-	qpid/ha/TxReplicator.cpp
-	qpid/ha/TxReplicator.h
         qpid/ha/types.h
     )
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp Tue Aug 19 22:34:15 2014
@@ -96,7 +96,8 @@ std::string TxBuffer::endCommit(Transact
 void TxBuffer::setError(const std::string& e) {
     QPID_LOG(error, "Asynchronous transaction error: " << e);
     sys::Mutex::ScopedLock l(errorLock);
-    error = e;
+    if (!error.empty()) error += " ";
+    error += e;
 }
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Tue Aug 19 22:34:15 2014
@@ -49,7 +49,7 @@ using std::string;
 using sys::Mutex;
 
 Backup::Backup(HaBroker& hb, const Settings& s) :
-    logPrefix("Backup: "), membership(hb.getMembership()), stopped(false),
+    logPrefix(hb.logPrefix), membership(hb.getMembership()), stopped(false),
     haBroker(hb), broker(hb.getBroker()), settings(s),
     statusCheck(new StatusCheck(hb))
 {}
@@ -60,7 +60,7 @@ void Backup::setBrokerUrl(const Url& bro
     if (stopped) return;
     if (haBroker.getStatus() == JOINING) statusCheck->setUrl(brokers);
     if (!link) {                // Not yet initialized
-        QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
+        QPID_LOG(info, logPrefix << "Connecting to cluster: " << brokers);
         string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
         types::Uuid uuid(true);
         link = broker.getLinks().declare(
@@ -78,7 +78,6 @@ void Backup::setBrokerUrl(const Url& bro
 void Backup::stop(Mutex::ScopedLock&) {
     if (stopped) return;
     stopped = true;
-    QPID_LOG(debug, logPrefix << "Leaving backup role.");
     if (link) link->close();
     if (replicator.get()) {
         replicator->shutdown();
@@ -106,8 +105,7 @@ Role* Backup::promote() {
       case JOINING:
         if (statusCheck->canPromote()) return recover(l);
         else {
-            QPID_LOG(error,
-                     logPrefix << "Joining active cluster, cannot be promoted.");
+            QPID_LOG(error, logPrefix << "Joining active cluster, cannot be promoted.");
             throw Exception("Joining active cluster, cannot be promoted.");
         }
         break;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h Tue Aug 19 22:34:15 2014
@@ -22,6 +22,7 @@
  *
  */
 
+#include "LogPrefix.h"
 #include "Role.h"
 #include "Settings.h"
 #include "qpid/Url.h"
@@ -53,8 +54,6 @@ class Backup : public Role
     Backup(HaBroker&, const Settings&);
     ~Backup();
 
-    std::string getLogPrefix() const { return logPrefix; }
-
     void setBrokerUrl(const Url&);
 
     Role* promote();
@@ -65,7 +64,7 @@ class Backup : public Role
     void stop(sys::Mutex::ScopedLock&);
     Role* recover(sys::Mutex::ScopedLock&);
 
-    std::string logPrefix;
+    const LogPrefix& logPrefix;
     Membership& membership;
 
     sys::Mutex lock;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h Tue Aug 19 22:34:15 2014
@@ -22,6 +22,7 @@
  *
  */
 
+#include "LogPrefix.h"
 #include "qpid/broker/ConnectionObserver.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/log/Statement.h"
@@ -35,12 +36,17 @@ namespace ha {
 class BackupConnectionExcluder : public broker::ConnectionObserver
 {
   public:
+    BackupConnectionExcluder(const LogPrefix& lp) : logPrefix(lp) {}
+
     void opened(broker::Connection& connection) {
-        QPID_LOG(debug, "Backup: Rejected connection "+connection.getMgmtId());
+        QPID_LOG(trace, logPrefix << "Rejected connection "+connection.getMgmtId());
         connection.abort();
     }
 
     void closed(broker::Connection&) {}
+
+  private:
+    const LogPrefix& logPrefix;
 };
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp Tue Aug 19 22:34:15 2014
@@ -92,7 +92,7 @@ void BrokerInfo::assign(const Variant::M
 }
 
 std::ostream& BrokerInfo::printId(std::ostream& o) const {
-    o  << getSystemId().str().substr(0,8);
+    o  << shortStr(getSystemId());
     if (getAddress() != empty) o << "@" << getAddress();
     return o;
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h Tue Aug 19 22:34:15 2014
@@ -63,9 +63,14 @@ class BrokerInfo
     void assign(const types::Variant::Map&);
 
     // So it can be put in a set.
-    bool operator<(const BrokerInfo x) const { return systemId < x.systemId; }
+    bool operator<(const BrokerInfo& x) const { return systemId < x.systemId; }
 
-    // Print just the identifying information, not the status.
+    bool operator==(const BrokerInfo& x) const
+    { return address == x.address && systemId == x.systemId && status == x.status; }
+
+    bool operator!=(const BrokerInfo& x) const { return !(*this == x); }
+
+    // Print just the identifying information (shortId@address), not the status.
     std::ostream& printId(std::ostream& o) const;
 
   private:

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Tue Aug 19 22:34:15 2014
@@ -175,7 +175,7 @@ Variant::Map asMapVoid(const Variant& va
 // Report errors on the broker replication session.
 class BrokerReplicator::ErrorListener : public broker::SessionHandler::ErrorListener {
   public:
-    ErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+    ErrorListener(const LogPrefix& lp) : logPrefix(lp) {}
 
     void connectionException(framing::connection::CloseCode code, const std::string& msg) {
         QPID_LOG(error, logPrefix << framing::createConnectionException(code, msg).what());
@@ -189,12 +189,10 @@ class BrokerReplicator::ErrorListener : 
     void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
         QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
     }
-    void detach() {
-        QPID_LOG(debug, logPrefix << "Session detached.");
-    }
+    void detach() {}
 
   private:
-    std::string logPrefix;
+    const LogPrefix& logPrefix;
 };
 
 /** Keep track of queues or exchanges during the update process to solve 2
@@ -213,8 +211,9 @@ class BrokerReplicator::UpdateTracker {
     typedef boost::function<void (const std::string&)> CleanFn;
 
     UpdateTracker(const std::string& type_, // "queue" or "exchange"
-                  CleanFn f)
-        : type(type_), cleanFn(f) {}
+                  CleanFn f,
+                  const LogPrefix& lp)
+        : type(type_), cleanFn(f), logPrefix(lp) {}
 
     /** Destructor cleans up remaining initial queues. */
     ~UpdateTracker() {
@@ -224,7 +223,7 @@ class BrokerReplicator::UpdateTracker {
                      boost::bind(&UpdateTracker::clean, this, _1));
         }
         catch (const std::exception& e) {
-            QPID_LOG(error, "Error in cleanup of lost objects: " << e.what());
+            QPID_LOG(error, logPrefix << "Error in cleanup of lost objects: " << e.what());
         }
     }
 
@@ -251,7 +250,7 @@ class BrokerReplicator::UpdateTracker {
 
   private:
     void clean(const std::string& name) {
-        QPID_LOG(debug, "Backup: Deleted " << type << " " << name <<
+        QPID_LOG(debug, logPrefix << "Deleted " << type << " " << name <<
                  ": no longer exists on primary");
         try { cleanFn(name); }
         catch (const framing::NotFoundException&) {}
@@ -260,6 +259,7 @@ class BrokerReplicator::UpdateTracker {
     std::string type;
     Names initial, events;
     CleanFn cleanFn;
+    const LogPrefix& logPrefix;
 };
 
 namespace {
@@ -279,7 +279,7 @@ boost::shared_ptr<BrokerReplicator> Brok
 
 BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
     : Exchange(QPID_CONFIGURATION_REPLICATOR),
-      logPrefix("Backup: "), replicationTest(NONE),
+      logPrefix(hb.logPrefix), replicationTest(NONE),
       haBroker(hb), broker(hb.getBroker()),
       exchanges(broker.getExchanges()), queues(broker.getQueues()),
       link(l),
@@ -372,19 +372,20 @@ void BrokerReplicator::connected(Bridge&
     link->getRemoteAddress(primary);
     string queueName = bridge.getQueueName();
 
-    QPID_LOG(notice, logPrefix << (initialized ? "Failing over" : "Connecting")
-             << " to primary " << primary
-             << " status:" << printable(haBroker.getStatus()));
+    QPID_LOG(info, logPrefix << (initialized ? "Failing over" : "Connecting")
+             << " to primary " << primary);
     initialized = true;
 
     exchangeTracker.reset(
         new UpdateTracker("exchange",
-                          boost::bind(&BrokerReplicator::deleteExchange, this, _1)));
+                          boost::bind(&BrokerReplicator::deleteExchange, this, _1),
+                          logPrefix));
     exchanges.eachExchange(boost::bind(&BrokerReplicator::existingExchange, this, _1));
 
     queueTracker.reset(
         new UpdateTracker("queue",
-                          boost::bind(&BrokerReplicator::deleteQueue, this, _1, true)));
+                          boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
+                          logPrefix));
     queues.eachQueue(boost::bind(&BrokerReplicator::existingQueue, this, _1));
 
     framing::AMQP_ServerProxy peer(sessionHandler.out);
@@ -417,14 +418,14 @@ void BrokerReplicator::connected(Bridge&
 // Called for each queue in existence when the backup connects to a primary.
 void BrokerReplicator::existingQueue(const boost::shared_ptr<Queue>& q) {
     if (replicationTest.getLevel(*q)) {
-        QPID_LOG(debug, "Existing queue: " << q->getName());
+        QPID_LOG(debug, logPrefix << "Existing queue: " << q->getName());
         queueTracker->addQueue(q);
     }
 }
 
 void BrokerReplicator::existingExchange(const boost::shared_ptr<Exchange>& ex) {
     if (replicationTest.getLevel(*ex)) {
-        QPID_LOG(debug, "Existing exchange: " << ex->getName());
+        QPID_LOG(debug, logPrefix << "Existing exchange: " << ex->getName());
         exchangeTracker->addExchange(ex);
     }
 }
@@ -447,7 +448,7 @@ void BrokerReplicator::route(Deliverable
         if (msg.getMessage().getPropertyAsString(QMF_CONTENT) == EVENT) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
                 Variant::Map& map = i->asMap();
-                QPID_LOG(trace, "Broker replicator event: " << map);
+                QPID_LOG(trace, logPrefix << "Broker replicator event: " << map);
                 Variant::Map& schema = map[SCHEMA_ID].asMap();
                 Variant::Map& values = map[VALUES].asMap();
                 std::string key = (schema[PACKAGE_NAME].asString() +
@@ -459,7 +460,7 @@ void BrokerReplicator::route(Deliverable
         } else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
                 Variant::Map& map = i->asMap();
-                QPID_LOG(trace, "Broker replicator response: " << map);
+                QPID_LOG(trace, logPrefix << "Broker replicator response: " << map);
                 string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString();
                 Variant::Map& values = map[VALUES].asMap();
                 framing::FieldTable args;
@@ -691,8 +692,7 @@ void BrokerReplicator::doResponseExchang
     if (exchange &&
         exchange->getArgs().getAsString(QPID_HA_UUID) != args.getAsString(QPID_HA_UUID))
     {
-        QPID_LOG(warning, logPrefix << "UUID mismatch, replacing exchange: "
-                 << name);
+        QPID_LOG(warning, logPrefix << "Exchange response replacing (UUID mismatch): " << name);
         deleteExchange(name);
     }
     CreateExchangeResult result = createExchange(
@@ -793,21 +793,17 @@ void BrokerReplicator::deleteQueue(const
 }
 
 void BrokerReplicator::deleteExchange(const std::string& name) {
-    try {
-        boost::shared_ptr<broker::Exchange> exchange = exchanges.find(name);
-        if (!exchange) {
-            QPID_LOG(warning, logPrefix << "Cannot delete exchange, not found: " << name);
-            return;
-        }
-        if (exchange->inUseAsAlternate()) {
-            QPID_LOG(warning, "Cannot delete exchange, in use as alternate: " << name);
-            return;
-        }
-        broker.deleteExchange(name, userId, remoteHost);
-        QPID_LOG(debug, logPrefix << "Exchange deleted: " << name);
-    } catch (const framing::NotFoundException&) {
-        QPID_LOG(debug, logPrefix << "Exchange not found for deletion: " << name);
+    boost::shared_ptr<broker::Exchange> exchange = exchanges.find(name);
+    if (!exchange) {
+        QPID_LOG(warning, logPrefix << "Cannot delete exchange, not found: " << name);
+        return;
+    }
+    if (exchange->inUseAsAlternate()) {
+        QPID_LOG(warning, logPrefix << "Cannot delete exchange, in use as alternate: " << name);
+        return;
     }
+    broker.deleteExchange(name, userId, remoteHost);
+    QPID_LOG(debug, logPrefix << "Exchange deleted: " << name);
 }
 
 boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue(

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Tue Aug 19 22:34:15 2014
@@ -52,6 +52,7 @@ class FieldTable;
 }
 
 namespace ha {
+class LogPrefix;
 class HaBroker;
 class QueueReplicator;
 
@@ -155,7 +156,7 @@ class BrokerReplicator : public broker::
 
     void setMembership(const types::Variant::List&); // Set membership from list.
 
-    std::string logPrefix;
+    const LogPrefix& logPrefix;
     ReplicationTest replicationTest;
     std::string userId, remoteHost;
     HaBroker& haBroker;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp Tue Aug 19 22:34:15 2014
@@ -31,7 +31,7 @@ namespace qpid {
 namespace ha {
 
 ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
-    : haBroker(hb), logPrefix("Backup: "), self(uuid) {}
+    : haBroker(hb), logPrefix(hb.logPrefix), self(uuid) {}
 
 bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) {
     qpid::types::Variant::Map::const_iterator i = connection.getClientProperties().find(ConnectionObserver::BACKUP_TAG);
@@ -55,11 +55,10 @@ bool ConnectionObserver::getAddress(cons
     return false;
 }
 
-void ConnectionObserver::setObserver(const ObserverPtr& o, const std::string& newlogPrefix)
+void ConnectionObserver::setObserver(const ObserverPtr& o)
 {
     sys::Mutex::ScopedLock l(lock);
     observer = o;
-    logPrefix = newlogPrefix;
 }
 
 ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() {
@@ -83,21 +82,21 @@ void ConnectionObserver::opened(broker::
             // Set my own address if there is an address header.
             Address addr;
             if (getAddress(connection, addr)) haBroker.setAddress(addr);
-            QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId());
+            QPID_LOG(trace, logPrefix << "Rejected self connection "+connection.getMgmtId());
             connection.abort();
             return;
         }
         if (connection.isLink()) return; // Allow outgoing links.
         if (connection.getClientProperties().find(ADMIN_TAG) != connection.getClientProperties().end()) {
-            QPID_LOG(debug, logPrefix << "Accepted admin connection: "
-                     << connection.getMgmtId());
+            QPID_LOG(trace, logPrefix << "Accepted admin connection: " << connection.getMgmtId());
             return;                 // No need to call observer, always allow admins.
         }
         ObserverPtr o(getObserver());
         if (o) o->opened(connection);
     }
     catch (const std::exception& e) {
-        QPID_LOG(error, logPrefix << "Open error: " << e.what());
+        QPID_LOG(error, logPrefix << "Error on incoming connection " << connection.getMgmtId()
+                 << ": " << e.what());
         throw;
     }
 }
@@ -109,7 +108,8 @@ void ConnectionObserver::closed(broker::
         if (o) o->closed(connection);
     }
     catch (const std::exception& e) {
-        QPID_LOG(error, logPrefix << "Close error: " << e.what());
+        QPID_LOG(error, logPrefix << "Error closing incoming connection " << connection.getMgmtId()
+                 << ": " << e.what());
         throw;
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h Tue Aug 19 22:34:15 2014
@@ -34,6 +34,7 @@ struct Address;
 namespace ha {
 class BrokerInfo;
 class HaBroker;
+class LogPrefix;
 
 /**
  * Observes connections, delegates to another ConnectionObserver for
@@ -59,7 +60,7 @@ class ConnectionObserver : public broker
 
     ConnectionObserver(HaBroker& haBroker, const types::Uuid& self);
 
-    void setObserver(const ObserverPtr&, const std::string& logPrefix);
+    void setObserver(const ObserverPtr&);
     ObserverPtr getObserver();
 
     void reset();
@@ -72,7 +73,7 @@ class ConnectionObserver : public broker
 
     sys::Mutex lock;
     HaBroker& haBroker;
-    std::string logPrefix;
+    const LogPrefix& logPrefix;
     ObserverPtr observer;
     types::Uuid self;
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/FailoverExchange.cpp Tue Aug 19 22:34:15 2014
@@ -63,7 +63,6 @@ ostream& operator<<(ostream& o, const Os
 FailoverExchange::FailoverExchange(management::Manageable& parent, Broker& b)
     : Exchange(typeName, &parent, &b)
 {
-    QPID_LOG(debug, typeName << " created.");
     if (mgmtExchange != 0)
         mgmtExchange->set_type(typeName);
 }
@@ -114,7 +113,7 @@ bool FailoverExchange::hasBindings() {
 }
 
 void FailoverExchange::route(Deliverable&) {
-    QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring");
+    QPID_LOG(warning, typeName << " unexpected message, ignored.");
 }
 
 void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue, sys::Mutex::ScopedLock&) {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Tue Aug 19 22:34:15 2014
@@ -69,10 +69,16 @@ using boost::dynamic_pointer_cast;
 //
 class HaBroker::BrokerObserver : public broker::BrokerObserver {
   public:
+    BrokerObserver(const LogPrefix& lp) : logPrefix(lp) {}
+
     void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
         q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot));
-        q->getMessageInterceptors().add(boost::shared_ptr<IdSetter>(new IdSetter(q->getName())));
+        q->getMessageInterceptors().add(
+            boost::shared_ptr<IdSetter>(new IdSetter(logPrefix, q->getName())));
     }
+
+  private:
+    const LogPrefix& logPrefix;
 };
 
 // Called in Plugin::earlyInitialize
@@ -83,20 +89,19 @@ HaBroker::HaBroker(broker::Broker& b, co
       broker(b),
       observer(new ConnectionObserver(*this, systemId)),
       role(new StandAlone),
-      membership(BrokerInfo(systemId, STANDALONE), *this),
+      membership(BrokerInfo(systemId, STANDALONE), *this), // Sets logPrefix
       failoverExchange(new FailoverExchange(*b.GetVhostObject(), b))
 {
     // If we are joining a cluster we must start excluding clients now,
     // otherwise there's a window for a client to connect before we get to
     // initialize()
     if (settings.cluster) {
-        QPID_LOG(debug, "Backup starting, rejecting client connections.");
-        shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
-        observer->setObserver(excluder, "Backup: ");
+        shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder(logPrefix));
+        observer->setObserver(excluder);
         broker.getConnectionObservers().add(observer);
         broker.getExchanges().registerExchange(failoverExchange);
     }
-    broker.getBrokerObservers().add(boost::shared_ptr<BrokerObserver>(new BrokerObserver()));
+    broker.getBrokerObservers().add(boost::shared_ptr<BrokerObserver>(new BrokerObserver(logPrefix)));
 }
 
 namespace {
@@ -107,8 +112,8 @@ bool isNone(const std::string& x) { retu
 // Called in Plugin::initialize
 void HaBroker::initialize() {
     if (settings.cluster) {
+        QPID_LOG(info, logPrefix << "Starting HA broker");
         membership.setStatus(JOINING);
-        QPID_LOG(info, "Initializing HA broker: " << membership.getSelf());
     }
 
     // Set up the management object.
@@ -138,7 +143,6 @@ void HaBroker::initialize() {
 }
 
 HaBroker::~HaBroker() {
-    QPID_LOG(notice, role->getLogPrefix() << "Shut down");
     broker.getConnectionObservers().remove(observer);
 }
 
@@ -160,7 +164,7 @@ Manageable::status_t HaBroker::Managemen
       case _qmf::HaBroker::METHOD_REPLICATE: {
           _qmf::ArgsHaBrokerReplicate& bq_args =
               dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
-          QPID_LOG(debug, role->getLogPrefix() << "Replicate individual queue "
+          QPID_LOG(debug, logPrefix << "Replicate individual queue "
                    << bq_args.i_queue << " from " << bq_args.i_broker);
 
           shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
@@ -195,7 +199,7 @@ void HaBroker::setPublicUrl(const Url& u
     knownBrokers.push_back(url);
     vector<Url> urls(1, url);
     failoverExchange->updateUrls(urls);
-    QPID_LOG(debug, role->getLogPrefix() << "Public URL set to: " << url);
+    QPID_LOG(debug, logPrefix << "Public URL set to: " << url);
 }
 
 void HaBroker::setBrokerUrl(const Url& url) {
@@ -203,7 +207,7 @@ void HaBroker::setBrokerUrl(const Url& u
         Mutex::ScopedLock l(lock);
         brokerUrl = url;
         mgmtObject->set_brokersUrl(brokerUrl.str());
-        QPID_LOG(info, role->getLogPrefix() << "Brokers URL set to: " << url);
+        QPID_LOG(info, logPrefix << "Brokers URL set to: " << url);
     }
     role->setBrokerUrl(url); // Oustside lock
 }
@@ -214,7 +218,7 @@ std::vector<Url> HaBroker::getKnownBroke
 }
 
 void HaBroker::shutdown(const std::string& message) {
-    QPID_LOG(critical, "Shutting down: " << message);
+    QPID_LOG(critical, logPrefix << "Shutting down: " << message);
     broker.shutdown();
     throw Exception(message);
 }
@@ -224,7 +228,7 @@ BrokerStatus HaBroker::getStatus() const
 }
 
 void HaBroker::setAddress(const Address& a) {
-    QPID_LOG(info, role->getLogPrefix() << "Set self address to: " << a);
+    QPID_LOG(info, logPrefix << "Set self address to: " << a);
     membership.setSelfAddress(a);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Tue Aug 19 22:34:15 2014
@@ -25,6 +25,7 @@
 #include "BrokerInfo.h"
 #include "Membership.h"
 #include "types.h"
+#include "LogPrefix.h"
 #include "Settings.h"
 #include "qpid/Url.h"
 #include "FailoverExchange.h"
@@ -101,6 +102,9 @@ class HaBroker : public management::Mana
     /** Authenticated user ID for queue create/delete */
     std::string getUserId() const { return userId; }
 
+    /** logPrefix is thread safe and used by other classes (Membership) */
+    LogPrefix logPrefix;
+
   private:
     class BrokerObserver;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp Tue Aug 19 22:34:15 2014
@@ -76,7 +76,7 @@ struct HaPlugin : public Plugin {
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
         if (broker && (settings.cluster || settings.queueReplication)) {
             if (!broker->getManagementAgent()) {
-                QPID_LOG(warning, "HA plugin disabled because management is disabled");
+                QPID_LOG(warning, "Cannot start HA: management is disabled");
                 if (settings.cluster)
                     throw Exception("Cannot start HA: management is disabled");
             } else {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h Tue Aug 19 22:34:15 2014
@@ -33,6 +33,7 @@
 
 namespace qpid {
 namespace ha {
+class LogPrefix;
 
 /**
  * A MessageInterceptor that sets the ReplicationId on each message as it is
@@ -43,16 +44,16 @@ namespace ha {
 class IdSetter : public broker::MessageInterceptor
 {
   public:
-    IdSetter(const std::string& q, ReplicationId firstId=1) : queue(q), nextId(firstId) {
-        QPID_LOG(debug, "Replication-ID will be set for " << queue << " from " << firstId);
-    }
+    IdSetter(const LogPrefix& lp, const std::string& q, ReplicationId firstId=1) :
+        logPrefix(lp), queue(q), nextId(firstId)
+    {}
 
     void record(broker::Message& m) {
         // Record is called when a message is first delivered to a queue, before it has
         // been enqueued or saved in a transaction buffer. This is when we normally want
         // to assign a replication-id.
         m.setReplicationId(nextId++);
-        QPID_LOG(trace, "Replication-ID set: " << logMessageId(queue, m.getReplicationId()));
+        QPID_LOG(trace, logPrefix << "Replication-ID set: " << logMessageId(queue, m.getReplicationId()));
     }
 
     void publish(broker::Message& m) {
@@ -62,11 +63,12 @@ class IdSetter : public broker::MessageI
         // store record() is not called, so set the ID now if not already set.
         if (!m.hasReplicationId()) {
             m.setReplicationId(nextId++);
-            QPID_LOG(trace, "Replication-ID set: " << logMessageId(queue, m));
+            QPID_LOG(trace, logPrefix << "Replication-ID set: " << logMessageId(queue, m));
         }
     }
 
   private:
+    const LogPrefix& logPrefix;
     std::string queue;
     sys::AtomicValue<uint32_t> nextId;
 };

Copied: qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.cpp (from r1618964, qpid/trunk/qpid/cpp/src/qpid/ha/StandAlone.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.cpp?p2=qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.cpp&p1=qpid/trunk/qpid/cpp/src/qpid/ha/StandAlone.h&r1=1618964&r2=1619003&rev=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StandAlone.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.cpp Tue Aug 19 22:34:15 2014
@@ -1,6 +1,3 @@
-#ifndef QPID_HA_STANDALONE_H
-#define QPID_HA_STANDALONE_H
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -21,25 +18,18 @@
  * under the License.
  *
  */
-namespace qpid {
-struct Url;
+#include "LogPrefix.h"
+#include <iostream>
 
+namespace qpid {
 namespace ha {
 
-/**
- * Stand-alone role: acts as a stand-alone broker, no clustering.
- * HA module needed to setting up replication via QMF methods.
- */
-class StandAlone : public Role
-{
-  public:
-    std::string getLogPrefix() const { return logPrefix; }
-    Role* promote() { return 0; }
-    void setBrokerUrl(const Url&) {}
+std::ostream& operator<<(std::ostream& o, const LogPrefix& lp) {
+    return o << lp.get();
+}
 
-  private:
-    std::string logPrefix;
-};
-}} // namespace qpid::ha
+std::ostream& operator<<(std::ostream& o, const LogPrefix2& lp) {
+    return o << lp.prePrefix.get() << lp.get();
+}
 
-#endif  /*!QPID_HA_STANDALONE_H*/
+}} // namespace qpid::ha

Added: qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.h?rev=1619003&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.h Tue Aug 19 22:34:15 2014
@@ -0,0 +1,75 @@
+#ifndef QPID_HA_LOGPREFIX_H
+#define QPID_HA_LOGPREFIX_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/sys/Mutex.h>
+#include <string>
+#include <iosfwd>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Thread safe string holder to hold a string that may be read and modified concurrently.
+ */
+class LogPrefix
+{
+  public:
+    explicit LogPrefix(const std::string& s=std::string()) : prefix(s) {}
+    void set(const std::string& s) { sys::RWlock::ScopedWlock l(lock); prefix = s; }
+    std::string get() const { sys::RWlock::ScopedRlock l(lock); return prefix; }
+
+    LogPrefix& operator=(const std::string& s) { set(s); return *this; }
+    operator std::string() const { return get(); }
+
+  private:
+    // Undefined, not copyable.
+    LogPrefix(const LogPrefix& lp);
+    LogPrefix& operator=(const LogPrefix&);
+
+    mutable sys::RWlock lock;
+    std::string prefix;
+};
+std::ostream& operator<<(std::ostream& o, const LogPrefix& lp);
+
+/**
+ * A two-part log prefix with a reference to a pre-prefix and a post-prefix.
+ * Operator << will print both parts, get/set just manage the post-prefix.
+ */
+class LogPrefix2 : public LogPrefix {
+  public:
+    const LogPrefix& prePrefix;
+    explicit LogPrefix2(const LogPrefix& lp, const std::string& s=std::string()) : LogPrefix(s), prePrefix(lp) {}
+    LogPrefix2& operator=(const std::string& s) { set(s); return *this; }
+
+  private:
+    // Undefined, not copyable.
+    LogPrefix2(const LogPrefix2& lp);
+    LogPrefix2& operator=(const LogPrefix2&);
+};
+std::ostream& operator<<(std::ostream& o, const LogPrefix2& lp);
+
+
+}} // namespace qpid::ha
+
+#endif  /*!QPID_HA_LOGPREFIX_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.cpp Tue Aug 19 22:34:15 2014
@@ -43,9 +43,14 @@ Membership::Membership(const BrokerInfo&
     : haBroker(b), self(info.getSystemId())
 {
     brokers[self] = info;
+    setPrefix();
     oldStatus = info.getStatus();
 }
 
+void Membership::setPrefix() {
+    haBroker.logPrefix = Msg() << shortStr(brokers[self].getSystemId())
+                               << "(" << printable(brokers[self].getStatus()) << ") ";
+}
 void Membership::clear() {
     Mutex::ScopedLock l(lock);
     BrokerInfo me = brokers[self];
@@ -57,7 +62,7 @@ void Membership::add(const BrokerInfo& b
     Mutex::ScopedLock l(lock);
     assert(b.getSystemId() != self);
     brokers[b.getSystemId()] = b;
-    update(l);
+    update(true, l);
 }
 
 
@@ -67,7 +72,7 @@ void Membership::remove(const types::Uui
     BrokerInfo::Map::iterator i = brokers.find(id);
     if (i != brokers.end()) {
         brokers.erase(i);
-        update(l);
+        update(true, l);
     }
 }
 
@@ -83,7 +88,7 @@ void Membership::assign(const types::Var
         BrokerInfo b(i->asMap());
         brokers[b.getSystemId()] = b;
     }
-    update(l);
+    update(true, l);
 }
 
 types::Variant::List Membership::asList() const {
@@ -144,8 +149,7 @@ bool checkTransition(BrokerStatus from, 
 }
 } // namespace
 
-void Membership::update(Mutex::ScopedLock& l) {
-    QPID_LOG(info, "Membership: " <<  brokers);
+void Membership::update(bool log, Mutex::ScopedLock& l) {
     // Update managment and send update event.
     BrokerStatus newStatus = getStatus(l);
     Variant::List brokerList = asList(l);
@@ -171,27 +175,30 @@ void Membership::update(Mutex::ScopedLoc
 
     // Check status transitions
     if (oldStatus != newStatus) {
-        QPID_LOG(info, "Status change: "
+        QPID_LOG(info, haBroker.logPrefix << "Status change: "
                  << printable(oldStatus) << " -> " << printable(newStatus));
         if (!checkTransition(oldStatus, newStatus)) {
             haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(oldStatus)
                                        << " -> " << printable(newStatus)));
         }
         oldStatus = newStatus;
+        setPrefix();
+        if (newStatus == READY) QPID_LOG(notice, haBroker.logPrefix << "Backup is ready");
     }
+    if (log) QPID_LOG(info, haBroker.logPrefix << "Membership update: " <<  brokers);
 }
 
 void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) {
     Mutex::ScopedLock l(lock);
     mgmtObject = mo;
-    update(l);
+    update(false, l);
 }
 
 
 void Membership::setStatus(BrokerStatus newStatus) {
     Mutex::ScopedLock l(lock);
     brokers[self].setStatus(newStatus);
-    update(l);
+    update(false, l);
 }
 
 BrokerStatus Membership::getStatus() const  {
@@ -215,7 +222,7 @@ BrokerInfo Membership::getSelf() const  
 void Membership::setSelfAddress(const Address& a) {
     Mutex::ScopedLock l(lock);
     brokers[self].setAddress(a);
-    update(l);
+    update(false, l);
 }
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Membership.h Tue Aug 19 22:34:15 2014
@@ -85,7 +85,8 @@ class Membership
     void setSelfAddress(const Address&);
 
   private:
-    void update(sys::Mutex::ScopedLock&);
+    void setPrefix();
+    void update(bool log, sys::Mutex::ScopedLock&);
     BrokerStatus getStatus(sys::Mutex::ScopedLock&) const;
     types::Variant::List asList(sys::Mutex::ScopedLock&) const;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Tue Aug 19 22:34:15 2014
@@ -38,6 +38,7 @@
 #include "qpid/framing/FieldValue.h"
 #include "qpid/log/Statement.h"
 #include "qpid/types/Uuid.h"
+#include "qpid/types/Variant.h"
 #include "qpid/sys/Timer.h"
 #include <boost/bind.hpp>
 #include <boost/shared_ptr.hpp>
@@ -54,6 +55,10 @@ using namespace framing;
 
 namespace {
 
+const std::string CLIENT_PROCESS_NAME("qpid.client_process");
+const std::string CLIENT_PID("qpid.client_pid");
+const std::string CLIENT_PPID("qpid.client_ppid");
+
 class PrimaryConnectionObserver : public broker::ConnectionObserver
 {
   public:
@@ -90,7 +95,7 @@ class ExpectedBackupTimerTask : public s
 
 class PrimaryErrorListener : public broker::SessionHandler::ErrorListener {
   public:
-    PrimaryErrorListener(const std::string& logPrefix_) : logPrefix(logPrefix_) {}
+    PrimaryErrorListener(const LogPrefix& lp) : logPrefix(lp) {}
 
     void connectionException(framing::connection::CloseCode code, const std::string& msg) {
         QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
@@ -104,17 +109,15 @@ class PrimaryErrorListener : public brok
     void incomingExecutionException(framing::execution::ErrorCode code, const std::string& msg) {
         QPID_LOG(debug, logPrefix << "Incoming " << framing::createSessionException(code, msg).what());
     }
-    void detach() {
-        QPID_LOG(debug, logPrefix << "Session detached.");
-    }
+    void detach() {}
 
   private:
-    std::string logPrefix;
+    const LogPrefix& logPrefix;
 };
 
 class PrimarySessionHandlerObserver : public broker::SessionHandlerObserver {
   public:
-    PrimarySessionHandlerObserver(const std::string& logPrefix)
+    PrimarySessionHandlerObserver(const LogPrefix& logPrefix)
         : errorListener(new PrimaryErrorListener(logPrefix)) {}
     void newSessionHandler(broker::SessionHandler& sh) {
         BrokerInfo info;
@@ -133,7 +136,7 @@ class PrimarySessionHandlerObserver : pu
 
 Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
     haBroker(hb), membership(hb.getMembership()),
-    logPrefix("Primary: "), active(false),
+    logPrefix(hb.logPrefix), active(false),
     replicationTest(hb.getSettings().replicateDefault.get()),
     sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)),
     queueLimits(logPrefix, hb.getBroker().getQueues(), replicationTest)
@@ -142,6 +145,7 @@ Primary::Primary(HaBroker& hb, const Bro
     // So we are safe from client interference while we set up the primary.
 
     hb.getMembership().setStatus(RECOVERING);
+    QPID_LOG(notice, logPrefix << "Promoted to primary");
 
     // Process all QueueReplicators, handles auto-delete queues.
     QueueReplicator::Vector qrs;
@@ -152,10 +156,9 @@ Primary::Primary(HaBroker& hb, const Bro
         // NOTE: RemoteBackups must be created before we set the BrokerObserver
         // or ConnectionObserver so that there is no client activity while
         // the QueueGuards are created.
-        QPID_LOG(notice, logPrefix << "Promoted and recovering, waiting for backups: "
-                 << expect);
+        QPID_LOG(notice, logPrefix << "Recovering backups: " << expect);
         for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
-            boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0));
+            boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0, logPrefix));
             backups[i->getSystemId()] = backup;
             if (!backup->isReady()) expectedBackups.insert(backup);
             setCatchupQueues(backup, true); // Create guards
@@ -173,7 +176,7 @@ Primary::Primary(HaBroker& hb, const Bro
 
     // Allow client connections
     connectionObserver.reset(new PrimaryConnectionObserver(*this));
-    haBroker.getObserver()->setObserver(connectionObserver, logPrefix);
+    haBroker.getObserver()->setObserver(connectionObserver);
 }
 
 Primary::~Primary() {
@@ -191,8 +194,8 @@ void Primary::checkReady() {
             activate = active = true;
     }
     if (activate) {
-        QPID_LOG(notice, logPrefix << "Promoted and active.");
         membership.setStatus(ACTIVE); // Outside of lock.
+        QPID_LOG(notice, logPrefix << "All backups recovered.");
     }
 }
 
@@ -205,7 +208,7 @@ void Primary::checkReady(boost::shared_p
             info.setStatus(READY);
             membership.add(info);
             if (expectedBackups.erase(backup)) {
-                QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
+                QPID_LOG(info, logPrefix << "Recovering backup is ready: " << info);
                 ready = true;
             }
         else
@@ -229,7 +232,7 @@ void Primary::timeoutExpectedBackups() {
             boost::shared_ptr<RemoteBackup> backup = *j;
             if (!backup->getConnection()) {
                 BrokerInfo info = backup->getBrokerInfo();
-                QPID_LOG(error, logPrefix << "Expected backup timed out: " << info);
+                QPID_LOG(error, logPrefix << "Recovering backup timed out: " << info);
                 backupDisconnect(backup, l); // Calls erase(j)
                 // Keep broker in membership but downgrade status to CATCHUP.
                 // The broker will get this status change when it eventually connects.
@@ -303,6 +306,8 @@ void Primary::queueCreate(const QueuePtr
     ReplicateLevel level = replicationTest.useLevel(*q);
     q->addArgument(QPID_REPLICATE, printable(level).str());
     if (level) {
+        QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
+                 << " replication: " << printable(level));
         // Give each queue a unique id. Used by backups to avoid confusion of
         // same-named queues.
         q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
@@ -312,8 +317,6 @@ void Primary::queueCreate(const QueuePtr
             for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
                 i->second->queueCreate(q);
         }
-        QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
-                 << " replication: " << printable(level));
         checkReady();           // Outside lock
     }
 }
@@ -358,7 +361,7 @@ void Primary::exchangeDestroy(const Exch
 shared_ptr<RemoteBackup> Primary::backupConnect(
     const BrokerInfo& info, broker::Connection& connection, Mutex::ScopedLock&)
 {
-    shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection));
+    shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection, logPrefix));
     queueLimits.addBackup(backup);
     backups[info.getSystemId()] = backup;
     return backup;
@@ -382,7 +385,15 @@ void Primary::opened(broker::Connection&
     if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
         Mutex::ScopedLock l(lock);
         BackupMap::iterator i = backups.find(info.getSystemId());
+        if (info.getStatus() == JOINING) {
+            info.setStatus(CATCHUP);
+            membership.add(info);
+        }
         if (i == backups.end()) {
+            if (info.getStatus() == JOINING) {
+                info.setStatus(CATCHUP);
+                membership.add(info);
+            }
             QPID_LOG(info, logPrefix << "New backup connection: " << info);
             backup = backupConnect(info, connection, l);
         }
@@ -397,13 +408,20 @@ void Primary::opened(broker::Connection&
             i->second->setConnection(&connection);
             backup = i->second;
         }
-        if (info.getStatus() == JOINING) {
-            info.setStatus(CATCHUP);
-            membership.add(info);
+    }
+    else {
+        const types::Variant::Map& properties = connection.getClientProperties();
+        std::ostringstream pinfo;
+        types::Variant::Map::const_iterator i = properties.find(CLIENT_PROCESS_NAME);
+        // FIXME aconway 2014-08-13: Conditional on logging.
+        if (i != properties.end()) {
+            pinfo << "  " << i->second;
+            i = properties.find(CLIENT_PID);
+            if (i != properties.end())
+                pinfo << "(" << i->second << ")";
         }
+        QPID_LOG(info, logPrefix << "Accepted client connection " << connection.getMgmtId() << pinfo.str());
     }
-    else
-        QPID_LOG(debug, logPrefix << "Accepted client connection " << connection.getMgmtId());
 
     // Outside lock
     if (backup) {
@@ -448,7 +466,7 @@ boost::shared_ptr<QueueGuard> Primary::g
 }
 
 Role* Primary::promote() {
-    QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo());
+    QPID_LOG(info, logPrefix << "Ignoring promotion, already primary");
     return 0;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h Tue Aug 19 22:34:15 2014
@@ -25,6 +25,7 @@
 #include "types.h"
 #include "hash.h"
 #include "BrokerInfo.h"
+#include "LogPrefix.h"
 #include "PrimaryQueueLimits.h"
 #include "ReplicationTest.h"
 #include "Role.h"
@@ -81,7 +82,6 @@ class Primary : public Role
     ~Primary();
 
     // Role implementation
-    std::string getLogPrefix() const { return logPrefix; }
     Role* promote();
     void setBrokerUrl(const Url&) {}
 
@@ -142,7 +142,7 @@ class Primary : public Role
     mutable sys::Mutex lock;
     HaBroker& haBroker;
     Membership& membership;
-    std::string logPrefix;
+    const LogPrefix& logPrefix;
     bool active;
     ReplicationTest replicationTest;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h Tue Aug 19 22:34:15 2014
@@ -36,6 +36,7 @@ class Queue;
 }
 
 namespace ha {
+class LogPrefix;
 class RemoteBackup;
 
 /**
@@ -48,7 +49,7 @@ class PrimaryQueueLimits
 {
   public:
     // FIXME aconway 2014-01-24: hardcoded maxQueues, use negotiated channel-max
-    PrimaryQueueLimits(const std::string& lp,
+    PrimaryQueueLimits(const LogPrefix& lp,
                        broker::QueueRegistry& qr,
                        const ReplicationTest& rt
     ) :
@@ -97,7 +98,7 @@ class PrimaryQueueLimits
     void removeBackup(const boost::shared_ptr<RemoteBackup>&) {}
 
   private:
-    std::string logPrefix;
+    const LogPrefix& logPrefix;
     uint64_t maxQueues;
     uint64_t queues;
 }; 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp Tue Aug 19 22:34:15 2014
@@ -94,6 +94,7 @@ PrimaryTxObserver::PrimaryTxObserver(
     Primary& p, HaBroker& hb, const boost::intrusive_ptr<broker::TxBuffer>& tx
 ) :
     state(SENDING),
+    logPrefix(hb.logPrefix),
     primary(p), haBroker(hb), broker(hb.getBroker()),
     replicationTest(hb.getSettings().replicateDefault.get()),
     txBuffer(tx),
@@ -101,7 +102,7 @@ PrimaryTxObserver::PrimaryTxObserver(
     exchangeName(TRANSACTION_REPLICATOR_PREFIX+id.str()),
     empty(true)
 {
-    logPrefix = "Primary transaction "+shortStr(id)+": ";
+    logPrefix = "Primary TX "+shortStr(id)+": ";
 
     // The brokers known at this point are the ones that will be included
     // in the transaction. Brokers that join later are not included.
@@ -115,8 +116,7 @@ PrimaryTxObserver::PrimaryTxObserver(
     for (size_t i = 0; i < incomplete.size(); ++i)
         txBuffer->startCompleter();
 
-    QPID_LOG(debug, logPrefix << "Started TX " << id);
-    QPID_LOG(debug, logPrefix << "Backups: " << backups);
+    QPID_LOG(debug, logPrefix << "Started, backups " << backups);
 }
 
 void PrimaryTxObserver::initialize() {
@@ -140,9 +140,7 @@ void PrimaryTxObserver::initialize() {
 }
 
 
-PrimaryTxObserver::~PrimaryTxObserver() {
-    QPID_LOG(debug, logPrefix << "Ended");
-}
+PrimaryTxObserver::~PrimaryTxObserver() {}
 
 void PrimaryTxObserver::checkState(State expect, const std::string& msg) {
     if (state != expect)
@@ -254,7 +252,7 @@ void PrimaryTxObserver::end(Mutex::Scope
     try {
         broker.getExchanges().destroy(getExchangeName());
     } catch (const std::exception& e) {
-        QPID_LOG(error, logPrefix << "Deleting transaction exchange: "  << e.what());
+        QPID_LOG(error, logPrefix << "Deleting TX exchange: "  << e.what());
     }
 }
 
@@ -266,11 +264,12 @@ bool PrimaryTxObserver::completed(const 
     return false;
 }
 
-bool PrimaryTxObserver::error(const Uuid& id, const char* msg, Mutex::ScopedLock& l)
+bool PrimaryTxObserver::error(const Uuid& id, const std::string& msg, Mutex::ScopedLock& l)
 {
     if (incomplete.find(id) != incomplete.end()) {
         // Note: setError before completed since completed may trigger completion.
-        txBuffer->setError(QPID_MSG(logPrefix << msg << id));
+        // Only use the TX part of the log prefix.
+        txBuffer->setError(Msg() << logPrefix.get() << msg << shortStr(id) << ".");
         completed(id, l);
         return true;
     }
@@ -290,7 +289,7 @@ void PrimaryTxObserver::txPrepareOkEvent
 void PrimaryTxObserver::txPrepareFailEvent(const string& data) {
     Mutex::ScopedLock l(lock);
     types::Uuid backup = decodeStr<TxPrepareFailEvent>(data).broker;
-    if (error(backup, "Prepare failed on backup: ", l)) {
+    if (error(backup, "Prepare failed on backup ", l)) {
         QPID_LOG(error, logPrefix << "Prepare failed on backup " << backup);
     } else {
         QPID_LOG(error, logPrefix << "Unexpected prepare-fail response from " << backup);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h Tue Aug 19 22:34:15 2014
@@ -24,6 +24,7 @@
 
 #include "types.h"
 #include "ReplicationTest.h"
+#include "LogPrefix.h"
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/TransactionObserver.h"
 #include "qpid/log/Statement.h"
@@ -105,11 +106,11 @@ class PrimaryTxObserver : public broker:
     void txPrepareOkEvent(const std::string& data);
     void txPrepareFailEvent(const std::string& data);
     bool completed(const types::Uuid& id, sys::Mutex::ScopedLock&);
-    bool error(const types::Uuid& id, const char* msg, sys::Mutex::ScopedLock& l);
+    bool error(const types::Uuid& id, const std::string& msg, sys::Mutex::ScopedLock& l);
 
     sys::Monitor lock;
     State state;
-    std::string logPrefix;
+    LogPrefix2 logPrefix;
     Primary& primary;
     HaBroker& haBroker;
     broker::Broker& broker;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Tue Aug 19 22:34:15 2014
@@ -47,8 +47,8 @@ class QueueGuard::QueueObserver : public
 
 
 
-QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
-    : cancelled(false), queue(q)
+QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info, const LogPrefix& lp)
+    : cancelled(false), logPrefix(lp), queue(q)
 {
     std::ostringstream os;
     os << "Guard of " << queue.getName() << " at ";
@@ -61,7 +61,9 @@ QueueGuard::QueueGuard(broker::Queue& q,
     QueuePosition front, back;
     q.getRange(front, back, broker::REPLICATOR);
     first = back + 1;
-    QPID_LOG(debug, logPrefix << "First guarded position " << first);
+    QPID_LOG(debug, logPrefix << "Guarded: front " << front
+             << ", back " << back
+             << ", guarded " << first);
 }
 
 QueueGuard::~QueueGuard() { cancel(); }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h Tue Aug 19 22:34:15 2014
@@ -24,6 +24,7 @@
 
 #include "types.h"
 #include "hash.h"
+#include "LogPrefix.h"
 #include "qpid/types/Uuid.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/unordered_map.h"
@@ -59,7 +60,7 @@ class ReplicatingSubscription;
  */
 class QueueGuard {
   public:
-    QueueGuard(broker::Queue& q, const BrokerInfo&);
+    QueueGuard(broker::Queue& q, const BrokerInfo&, const LogPrefix&);
     ~QueueGuard();
 
     /** QueueObserver override. Delay completion of the message.
@@ -97,7 +98,7 @@ class QueueGuard {
     sys::Mutex lock;
     QueuePosition first;
     bool cancelled;
-    std::string logPrefix;
+    LogPrefix2 logPrefix;
     broker::Queue& queue;
     Delayed delayed;
     boost::shared_ptr<QueueObserver> observer;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Tue Aug 19 22:34:15 2014
@@ -22,6 +22,7 @@
 #include "Event.h"
 #include "HaBroker.h"
 #include "IdSetter.h"
+#include "LogPrefix.h"
 #include "QueueReplicator.h"
 #include "QueueSnapshot.h"
 #include "ReplicatingSubscription.h"
@@ -97,12 +98,11 @@ class QueueReplicator::ErrorListener : p
             QPID_LOG(error, logPrefix << "Incoming "
                      << framing::createSessionException(code, msg).what());
     }
-    void detach() {
-        QPID_LOG(debug, logPrefix << "Session detached");
-    }
+    void detach() {}
+
   private:
     boost::weak_ptr<QueueReplicator> queueReplicator;
-    std::string logPrefix;
+    const LogPrefix& logPrefix;
 };
 
 class QueueReplicator::QueueObserver : public broker::QueueObserver {
@@ -152,11 +152,12 @@ QueueReplicator::QueueReplicator(HaBroke
       link(l),
       queue(q),
       sessionHandler(0),
-      logPrefix("Backup of "+q->getName()+": "),
+      logPrefix(hb.logPrefix, "Backup of "+q->getName()+": "),
       subscribed(false),
       settings(hb.getSettings()),
       nextId(0), maxId(0)
 {
+    QPID_LOG(debug, logPrefix << "Created");
     // The QueueReplicator will take over setting replication IDs.
     boost::shared_ptr<IdSetter> setter =
         q->getMessageInterceptors().findType<IdSetter>();
@@ -181,7 +182,6 @@ QueueReplicator::~QueueReplicator() {}
 
 void QueueReplicator::initialize() {
     Mutex::ScopedLock l(lock);
-    QPID_LOG(debug, logPrefix << "Created");
     if (!queue) return;         // Already destroyed
 
     // Enable callback to route()
@@ -255,10 +255,13 @@ void QueueReplicator::initializeBridge(B
     arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
     arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
     boost::shared_ptr<QueueSnapshot> qs = queue->getObservers().findType<QueueSnapshot>();
-    if (qs) arguments.set(ReplicatingSubscription::QPID_ID_SET,
-                          FieldTable::ValuePtr(
-                              new Var32Value(encodeStr(qs->getSnapshot()), TYPE_CODE_VBIN32)));
-
+    ReplicationIdSet snapshot;
+    if (qs) {
+        snapshot = qs->getSnapshot();
+        arguments.set(
+            ReplicatingSubscription::QPID_ID_SET,
+            FieldTable::ValuePtr(new Var32Value(encodeStr(snapshot), TYPE_CODE_VBIN32)));
+    }
     try {
         peer.getMessage().subscribe(
             args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
@@ -268,12 +271,12 @@ void QueueReplicator::initializeBridge(B
         peer.getMessage().flow(getName(), 1, settings.getFlowBytes());
     }
     catch(const exception& e) {
-        QPID_LOG(error, QPID_MSG(logPrefix + "Cannot connect to primary: " << e.what()));
+        QPID_LOG(error, logPrefix << "Cannot connect to primary: " << e.what());
         throw;
     }
     qpid::Address primary;
     link->getRemoteAddress(primary);
-    QPID_LOG(debug, logPrefix << "Connected to " << primary << "(" << bridgeName << ")");
+    QPID_LOG(debug, logPrefix << "Connected to " << primary << " snapshot=" << snapshot << " bridge=" << bridgeName);
     QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments);
 }
 
@@ -391,7 +394,7 @@ void QueueReplicator::promoted() {
         // On primary QueueReplicator no longer sets IDs, start an IdSetter.
         QPID_LOG(debug, logPrefix << "Promoted, first replication-id " << maxId+1)
         queue->getMessageInterceptors().add(
-            boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), maxId+1)));
+            boost::shared_ptr<IdSetter>(new IdSetter(logPrefix, queue->getName(), maxId+1)));
         // Process auto-deletes
         if (queue->isAutoDelete()) {
             // Make a temporary shared_ptr to prevent premature deletion of queue.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Tue Aug 19 22:34:15 2014
@@ -22,7 +22,10 @@
  *
  */
 
+
+
 #include "BrokerInfo.h"
+#include "LogPrefix.h"
 #include "hash.h"
 #include "qpid/broker/Exchange.h"
 #include <boost/enable_shared_from_this.hpp>
@@ -134,7 +137,7 @@ class QueueReplicator : public broker::E
 
     bool deletedOnPrimary(framing::execution::ErrorCode e, const std::string& msg);
 
-    std::string logPrefix;
+    LogPrefix2 logPrefix;
     std::string bridgeName;
 
     bool subscribed;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Tue Aug 19 22:34:15 2014
@@ -35,13 +35,13 @@ using sys::Mutex;
 using boost::bind;
 
 RemoteBackup::RemoteBackup(
-    const BrokerInfo& info, broker::Connection* c
-) : brokerInfo(info), replicationTest(NONE), started(false), connection(c), reportedReady(false)
+    const BrokerInfo& info, broker::Connection* c, const LogPrefix& lp
+) : logPrefix(lp), brokerInfo(info), replicationTest(NONE),
+    started(false), connection(c), reportedReady(false)
 {
     std::ostringstream oss;
     oss << "Remote backup at " << info << ": ";
     logPrefix = oss.str();
-    QPID_LOG(debug, logPrefix << (c? "Connected" : "Expected"));
 }
 
 RemoteBackup::~RemoteBackup() {
@@ -70,7 +70,7 @@ void RemoteBackup::catchupQueue(const Qu
         QPID_LOG(debug, logPrefix << "Catch-up queue"
                  << (createGuard ? " and guard" : "") << ": " << q->getName());
         catchupQueues.insert(q);
-        if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo));
+        if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo, logPrefix));
     }
 }
 
@@ -86,18 +86,12 @@ RemoteBackup::GuardPtr RemoteBackup::gua
 
 void RemoteBackup::ready(const QueuePtr& q) {
     catchupQueues.erase(q);
-    if (catchupQueues.size()) {
-        QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() << ", "
-                 << catchupQueues.size() << " remain to catch up");
-    }
-    else
-        QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() );
 }
 
 // Called via BrokerObserver::queueCreate and from catchupQueue
 void RemoteBackup::queueCreate(const QueuePtr& q) {
     if (replicationTest.getLevel(*q) == ALL)
-        guards[q].reset(new QueueGuard(*q, brokerInfo));
+        guards[q].reset(new QueueGuard(*q, brokerInfo, logPrefix));
 }
 
 // Called via BrokerObserver
@@ -112,6 +106,7 @@ void RemoteBackup::queueDestroy(const Qu
 
 bool RemoteBackup::reportReady() {
     if (!reportedReady && isReady()) {
+        if (catchupQueues.empty()) QPID_LOG(debug, logPrefix << "Caught up.");
         reportedReady = true;
         return true;
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h Tue Aug 19 22:34:15 2014
@@ -22,6 +22,7 @@
  *
  */
 
+#include "LogPrefix.h"
 #include "ReplicationTest.h"
 #include "BrokerInfo.h"
 #include "types.h"
@@ -56,7 +57,7 @@ class RemoteBackup
     /** Note: isReady() can be true after construction
      *@param connected true if the backup is already connected.
      */
-    RemoteBackup(const BrokerInfo&, broker::Connection*);
+    RemoteBackup(const BrokerInfo&, broker::Connection*, const LogPrefix&);
     ~RemoteBackup();
 
     /** Return guard associated with a queue. Used to create ReplicatingSubscription. */
@@ -102,7 +103,7 @@ class RemoteBackup
 
     typedef std::set<QueuePtr> QueueSet;
 
-    std::string logPrefix;
+    LogPrefix2 logPrefix;
     BrokerInfo brokerInfo;
     ReplicationTest replicationTest;
     GuardMap guards;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Tue Aug 19 22:34:15 2014
@@ -107,7 +107,7 @@ ReplicatingSubscription::ReplicatingSubs
     const framing::FieldTable& arguments
 ) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag,
                  resumeId, resumeTtl, arguments),
-
+    logPrefix(hb.logPrefix),
     position(0), wasStopped(false), ready(false), cancelled(false),
     haBroker(hb),
     primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()))
@@ -121,7 +121,7 @@ void ReplicatingSubscription::initialize
         FieldTable ft;
         if (!getArguments().getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
             throw InvalidArgumentException(
-                logPrefix+"Can't subscribe, no broker info: "+getTag());
+                logPrefix.get()+"Can't subscribe, no broker info: "+getTag());
         info.assign(ft);
 
         // Set a log prefix message that identifies the remote broker.
@@ -132,7 +132,7 @@ void ReplicatingSubscription::initialize
 
         // If there's already a guard (we are in failover) use it, else create one.
         if (primary) guard = primary->getGuard(queue, info);
-        if (!guard) guard.reset(new QueueGuard(*queue, info));
+        if (!guard) guard.reset(new QueueGuard(*queue, info, logPrefix.prePrefix));
 
         // NOTE: Once the observer is attached we can have concurrent
         // calls to dequeued so we need to lock use of this->dequeues.
@@ -147,7 +147,7 @@ void ReplicatingSubscription::initialize
         if (!snapshot) {
             queue->getObservers().remove(
                 boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
-            throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted");
+            throw ResourceDeletedException(logPrefix.get()+"Can't subscribe, queue deleted");
         }
         ReplicationIdSet primaryIds = snapshot->getSnapshot();
         std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET);
@@ -166,10 +166,10 @@ void ReplicatingSubscription::initialize
             // position >= front so if front is safe then position must be.
             position = front;
 
-            QPID_LOG(debug, logPrefix << "Subscribed: front " << front
-                     << ", back " << back
+            QPID_LOG(debug, logPrefix << "Subscribed: primary ["
+                     << front << "," << back << "]=" << primaryIds
                      << ", guarded " << guard->getFirst()
-                     << ", on backup " << skipEnqueue);
+                     << ", backup (keep " << skipEnqueue << ", drop " << initDequeues << ")");
             checkReady(l);
         }
 
@@ -242,7 +242,12 @@ void ReplicatingSubscription::checkReady
         ready = true;
         sys::Mutex::ScopedUnlock u(lock);
         // Notify Primary that a subscription is ready.
-        QPID_LOG(debug, logPrefix << "Caught up");
+        if (position+1 >= guard->getFirst()) {
+            QPID_LOG(debug, logPrefix << "Caught up at " << position);
+        } else {
+            QPID_LOG(debug, logPrefix << "Caught up at " << position << "short of guard at " << guard->getFirst());
+        }
+
         if (primary) primary->readyReplica(*this);
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Tue Aug 19 22:34:15 2014
@@ -23,6 +23,7 @@
  */
 
 #include "BrokerInfo.h"
+#include "LogPrefix.h"
 #include "qpid/broker/SemanticState.h"
 #include "qpid/broker/ConsumerFactory.h"
 #include "qpid/broker/QueueObserver.h"
@@ -144,7 +145,7 @@ class ReplicatingSubscription :
     bool doDispatch();
 
   private:
-    std::string logPrefix;
+    LogPrefix2 logPrefix;
     QueuePosition position;
     ReplicationIdSet dequeues;  // Dequeues to be sent in next dequeue event.
     ReplicationIdSet skipEnqueue; // Enqueues to skip: messages already on backup and tx enqueues.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Role.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Role.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Role.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Role.h Tue Aug 19 22:34:15 2014
@@ -40,9 +40,6 @@ class Role
   public:
     virtual ~Role() {}
 
-    /** Log prefix appropriate to the role */
-    virtual std::string getLogPrefix() const = 0;
-
     /** QMF promote method handler.
      * @return The new role if promoted, 0 if not. Caller takes ownership.
      */

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StandAlone.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StandAlone.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StandAlone.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StandAlone.h Tue Aug 19 22:34:15 2014
@@ -33,12 +33,8 @@ namespace ha {
 class StandAlone : public Role
 {
   public:
-    std::string getLogPrefix() const { return logPrefix; }
     Role* promote() { return 0; }
     void setBrokerUrl(const Url&) {}
-
-  private:
-    std::string logPrefix;
 };
 }} // namespace qpid::ha
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp Tue Aug 19 22:34:15 2014
@@ -39,10 +39,11 @@
 #include "qpid/broker/DeliverableMessage.h"
 #include "qpid/framing/BufferTypes.h"
 #include "qpid/log/Statement.h"
-#include <boost/shared_ptr.hpp>
-#include <boost/bind.hpp>
 #include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/MessageTransferBody.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+#include <sstream>
 
 namespace qpid {
 namespace ha {
@@ -57,15 +58,19 @@ namespace {
 const string PREFIX(TRANSACTION_REPLICATOR_PREFIX);
 } // namespace
 
-
-
 bool TxReplicator::isTxQueue(const string& q) {
     return startsWith(q, PREFIX);
 }
 
-string TxReplicator::getTxId(const string& q) {
-    assert(isTxQueue(q));
-    return q.substr(PREFIX.size());
+Uuid TxReplicator::getTxId(const string& q) {
+    if (TxReplicator::isTxQueue(q)) {
+        std::istringstream is(q);
+        is.seekg(PREFIX.size());
+        Uuid id;
+        is >> id;
+        if (!is.fail()) return id;
+    }
+    throw Exception(QPID_MSG("Invalid tx queue: " << q));
 }
 
 string TxReplicator::getType() const { return ReplicatingSubscription::QPID_TX_REPLICATOR; }
@@ -85,15 +90,14 @@ TxReplicator::TxReplicator(
     const boost::shared_ptr<broker::Queue>& txQueue,
     const boost::shared_ptr<broker::Link>& link) :
     QueueReplicator(hb, txQueue, link),
+    logPrefix(hb.logPrefix),
     store(hb.getBroker().hasStore() ? &hb.getBroker().getStore() : 0),
     channel(link->nextChannel()),
     empty(true), ended(false),
     dequeueState(hb.getBroker().getQueues())
 {
-    string id(getTxId(txQueue->getName()));
-    string shortId = id.substr(0, 8);
-    logPrefix = "Backup of transaction "+shortId+": ";
-    QPID_LOG(debug, logPrefix << "Started TX " << id);
+    logPrefix = "Backup of TX "+shortStr(getTxId(txQueue->getName()))+": ";
+    QPID_LOG(debug, logPrefix << "Started");
     if (!store) throw Exception(QPID_MSG(logPrefix << "No message store loaded."));
 
     // Dispatch transaction events.
@@ -213,7 +217,7 @@ void TxReplicator::prepare(const string&
         QPID_LOG(debug, logPrefix << "Local prepare OK");
         sendMessage(TxPrepareOkEvent(haBroker.getSystemId()).message(queue->getName()), l);
     } else {
-        QPID_LOG(debug, logPrefix << "Local prepare failed");
+        QPID_LOG(error, logPrefix << "Local prepare failed");
         sendMessage(TxPrepareFailEvent(haBroker.getSystemId()).message(queue->getName()), l);
     }
 }
@@ -240,7 +244,7 @@ void TxReplicator::backups(const string&
     TxBackupsEvent e;
     decodeStr(data, e);
     if (!e.backups.count(haBroker.getMembership().getSelf().getSystemId())) {
-        QPID_LOG(info, logPrefix << "Not participating in transaction");
+        QPID_LOG(info, logPrefix << "Not participating");
         end(l);
     } else {
         QPID_LOG(debug, logPrefix << "Backups: " << e.backups);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h Tue Aug 19 22:34:15 2014
@@ -22,11 +22,13 @@
  *
  */
 
+#include "LogPrefix.h"
 #include "QueueReplicator.h"
 #include "Event.h"
 #include "qpid/broker/DeliveryRecord.h"
 #include "qpid/broker/TransactionalStore.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/types/Uuid.h"
 
 namespace qpid {
 
@@ -56,7 +58,7 @@ class TxReplicator : public QueueReplica
     typedef boost::shared_ptr<broker::Link> LinkPtr;
 
     static bool isTxQueue(const std::string& queue);
-    static std::string getTxId(const std::string& queue);
+    static types::Uuid getTxId(const std::string& queue);
 
     static boost::shared_ptr<TxReplicator> create(
         HaBroker&, const QueuePtr& txQueue, const LinkPtr& link);
@@ -90,7 +92,7 @@ class TxReplicator : public QueueReplica
     void backups(const std::string& data, sys::Mutex::ScopedLock&);
     void end(sys::Mutex::ScopedLock&);
 
-    std::string logPrefix;
+    LogPrefix2 logPrefix;
     TxEnqueueEvent enq;         // Enqueue data for next deliver.
     boost::intrusive_ptr<broker::TxBuffer> txBuffer;
     broker::MessageStore* store;

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp?rev=1619003&r1=1619002&r2=1619003&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-txtest2.cpp Tue Aug 19 22:34:15 2014
@@ -165,7 +165,7 @@ struct Client
             session.close();
             connection.close();
         } catch(const std::exception& e) {
-            std::cout << e.what() << std::endl;
+            std::cout << "Client shutdown: " << e.what() << std::endl;
         }
     }
 };
@@ -350,7 +350,7 @@ int main(int argc, char** argv)
         }
         return 0;
     } catch(const std::exception& e) {
-	std::cout << e.what() << std::endl;
+	std::cout << argv[0] << ": " << e.what() << std::endl;
     }
     return 2;
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org