You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/12/22 21:36:02 UTC

svn commit: r1222431 - in /qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha: QueueReplicator.cpp QueueReplicator.h ReplicatingSubscription.cpp ReplicatingSubscription.h

Author: aconway
Date: Thu Dec 22 20:36:02 2011
New Revision: 1222431

URL: http://svn.apache.org/viewvc?rev=1222431&view=rev
Log:
QPID-3603: Format static log prefixes at consutruction time.

Creating the prefix dynamically caused sporadic core dumps with trace
logging on. It is also inefficient.

Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1222431&r1=1222430&r2=1222431&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Dec 22 20:36:02 2011
@@ -31,7 +31,7 @@
 #include "qpid/framing/FieldTable.h"
 #include "qpid/log/Statement.h"
 #include <boost/shared_ptr.hpp>
-#include <ostream>
+#include <sstream>
 
 namespace {
 const std::string QPID_REPLICATOR_("qpid.replicator-");
@@ -51,14 +51,13 @@ std::string QueueReplicator::replicatorN
     return QPID_REPLICATOR_ + queueName;
 }
 
-std::ostream& operator<<(std::ostream& o, const QueueReplicator& qr) {
-    return o << "HA: Backup queue " << qr.queue->getName() << ": ";
-}
-
 QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
     : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
 {
-    QPID_LOG(info, *this << "Created, settings: " << q->getSettings());
+    std::stringstream ss;
+    ss << "HA: Backup queue " << queue->getName() << ": ";
+    logPrefix = ss.str();
+    QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
 }
 
 // This must be separate from the constructor so we can call shared_from_this.
@@ -112,7 +111,7 @@ void QueueReplicator::initializeBridge(B
     peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false, "", 0, settings);
     peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
     peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
-    QPID_LOG(debug, *this << "Activated bridge from " << args.i_src << " to " << args.i_dest);
+    QPID_LOG(debug, logPrefix << "Activated bridge from " << args.i_src << " to " << args.i_dest);
     // Reset self reference so this will be deleted when all external refs are gone.
     self.reset();
 }
@@ -134,7 +133,7 @@ void QueueReplicator::dequeue(SequenceNu
         QueuedMessage message;
         if (queue->acquireMessageAt(n, message)) {
             queue->dequeue(0, message);
-            QPID_LOG(trace, *this << "Dequeued message "<< message.position);
+            QPID_LOG(trace, logPrefix << "Dequeued message "<< message.position);
         }
     }
 }
@@ -145,13 +144,13 @@ void QueueReplicator::route(Deliverable&
     sys::Mutex::ScopedLock l(lock);
     if (key == DEQUEUE_EVENT_KEY) {
         SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
-        QPID_LOG(trace, *this << "Received dequeues: " << dequeues);
+        QPID_LOG(trace, logPrefix << "Received dequeues: " << dequeues);
         //TODO: should be able to optimise the following
         for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
             dequeue(*i, l);
     } else if (key == POSITION_EVENT_KEY) {
         SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
-        QPID_LOG(trace, *this << "Advance position: from " << queue->getPosition()
+        QPID_LOG(trace, logPrefix << "Advance position: from " << queue->getPosition()
                  << " to " << position);
         assert(queue->getPosition() <= position);
          //TODO aconway 2011-12-14: Optimize this?
@@ -160,7 +159,7 @@ void QueueReplicator::route(Deliverable&
         queue->setPosition(position);
     } else {
         msg.deliverTo(queue);
-        QPID_LOG(trace, *this << "Enqueued message " << queue->getPosition());
+        QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
     }
 }
 

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1222431&r1=1222430&r2=1222431&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h Thu Dec 22 20:36:02 2011
@@ -71,11 +71,11 @@ class QueueReplicator : public broker::E
     void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
     void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
 
+    std::string logPrefix;
     sys::Mutex lock;
     boost::shared_ptr<broker::Queue> queue;
     boost::shared_ptr<broker::Link> link;
     boost::shared_ptr<QueueReplicator> self;
-  friend std::ostream& operator<<(std::ostream&, const QueueReplicator&);
 };
 
 }} // namespace qpid::ha

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1222431&r1=1222430&r2=1222431&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Thu Dec 22 20:36:02 2011
@@ -26,7 +26,7 @@
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Statement.h"
-#include "ostream"
+#include <sstream>
 
 namespace qpid {
 namespace ha {
@@ -42,13 +42,6 @@ const string DOLLAR("$");
 const string INTERNAL("-internal");
 } // namespace
 
-
-ostream& operator<<(ostream& o, const ReplicatingSubscription& rs) {
-    string url = rs.parent->getSession().getConnection().getUrl();
-    string qname= rs.getQueue()->getName();
-    return o << "HA: Primary: " << qname << "(" <<  url << "):";
-}
-
 string mask(const string& in)
 {
     return DOLLAR + in + INTERNAL;
@@ -95,6 +88,12 @@ ReplicatingSubscription::ReplicatingSubs
     events(new Queue(mask(name))),
     consumer(new DelegatingConsumer(*this))
 {
+    stringstream ss;
+    string url = parent->getSession().getConnection().getUrl();
+    string qname = getQueue()->getName();
+    ss << "HA: Primary queue " << qname << ", backup " <<  url << ": ";
+    logPrefix = ss.str();
+    
     // FIXME aconway 2011-12-09: Failover optimization removed.
     // There was code here to re-use messages already on the backup
     // during fail-over. This optimization was removed to simplify
@@ -102,7 +101,7 @@ ReplicatingSubscription::ReplicatingSubs
     // can be re-introduced later. Last revision with the optimization:
     // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
 
-    QPID_LOG(debug, *this << "Created subscription " << name);
+    QPID_LOG(debug, logPrefix << "Created subscription " << name);
 
     // Note that broker::Queue::getPosition() returns the sequence
     // number that will be assigned to the next message *minus 1*.
@@ -133,12 +132,12 @@ bool ReplicatingSubscription::deliver(Qu
                  SequenceNumber send(position);
                  --send;   // Send the position before m was enqueued.
                  sendPositionEvent(send, l); 
-                 QPID_LOG(trace, *this << "Sending position " << send
+                 QPID_LOG(trace, logPrefix << "Sending position " << send
                           << ", was " << backupPosition);
              }
              backupPosition = position;
         }
-        QPID_LOG(trace, *this << "Replicating message " << m.position);
+        QPID_LOG(trace, logPrefix << "Replicating message " << m.position);
     }
     return ConsumerImpl::deliver(m);
 }
@@ -148,7 +147,7 @@ ReplicatingSubscription::~ReplicatingSub
 // Called in the subscription's connection thread.
 void ReplicatingSubscription::cancel()
 {
-    QPID_LOG(debug, *this <<"Cancelled");
+    QPID_LOG(debug, logPrefix <<"Cancelled");
     getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
 }
 
@@ -163,7 +162,7 @@ void ReplicatingSubscription::enqueued(c
 // Called with lock held. Called in subscription's connection thread.
 void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
 {
-    QPID_LOG(trace, *this << "Sending dequeues " << dequeues);
+    QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
     string buf(dequeues.encodedSize(),'\0');
     framing::Buffer buffer(&buf[0], buf.size());
     dequeues.encode(buffer);
@@ -219,7 +218,7 @@ void ReplicatingSubscription::sendEvent(
 // the message lock in the queue. Called in arbitrary connection threads.
 void ReplicatingSubscription::dequeued(const QueuedMessage& m)
 {
-    QPID_LOG(trace, *this << "Dequeued message " << m.position);
+    QPID_LOG(trace, logPrefix << "Dequeued message " << m.position);
     {
         sys::Mutex::ScopedLock l(lock);
         dequeues.add(m.position);
@@ -229,7 +228,7 @@ void ReplicatingSubscription::dequeued(c
     // we're not in the dispatch thread.
     if (m.position > position) {
         m.payload->getIngressCompletion().finishCompleter();
-        QPID_LOG(trace, *this << "Completed message " << m.position << " early");
+        QPID_LOG(trace, logPrefix << "Completed message " << m.position << " early");
     }
 }
 

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1222431&r1=1222430&r2=1222431&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Thu Dec 22 20:36:02 2011
@@ -88,6 +88,7 @@ class ReplicatingSubscription : public b
   protected:
     bool doDispatch();
   private:
+    std::string logPrefix;
     boost::shared_ptr<broker::Queue> events;
     boost::shared_ptr<broker::Consumer> consumer;
     qpid::framing::SequenceSet dequeues;
@@ -97,6 +98,7 @@ class ReplicatingSubscription : public b
     void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
     void sendEvent(const std::string& key, framing::Buffer&,
                    const sys::Mutex::ScopedLock&);
+
     class DelegatingConsumer : public Consumer
     {
       public:
@@ -111,9 +113,6 @@ class ReplicatingSubscription : public b
       private:
         ReplicatingSubscription& delegate;
     };
-
-    /** Print a identifier for a ReplicatingSubscription */
-    friend std::ostream& operator<<(std::ostream&, const ReplicatingSubscription&);
 };
 
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org