You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/12/21 23:35:22 UTC

svn commit: r1221921 - in /qpid/branches/qpid-3603/qpid/cpp/src: qpid/ha/QueueReplicator.cpp qpid/ha/QueueReplicator.h qpid/ha/WiringReplicator.cpp tests/qpid-cluster-benchmark

Author: aconway
Date: Wed Dec 21 22:35:22 2011
New Revision: 1221921

URL: http://svn.apache.org/viewvc?rev=1221921&view=rev
Log:
QPID-3603: Delete HA resources (QueueReplicator) along with the queues.

Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/tests/qpid-cluster-benchmark

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1221921&r1=1221920&r2=1221921&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Wed Dec 21 22:35:22 2011
@@ -59,7 +59,13 @@ QueueReplicator::QueueReplicator(boost::
     : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
 {
     QPID_LOG(info, *this << "Created, settings: " << q->getSettings());
+}
 
+// This must be separate from the constructor so we can call shared_from_this.
+void QueueReplicator::activate() {
+    // Take a reference to myself to ensure not deleted before initializeBridge
+    // is called.
+    self = shared_from_this();
     queue->getBroker()->getLinks().declare(
         link->getHost(), link->getPort(),
         false,              // durable
@@ -72,19 +78,19 @@ QueueReplicator::QueueReplicator(boost::
         "",                 // excludes
         false,              // dynamic
         0,                  // sync?
+        // Include shared_ptr to self to ensure we not deleted before initializeBridge is called.
         boost::bind(&QueueReplicator::initializeBridge, this, _1, _2)
     );
 }
 
 QueueReplicator::~QueueReplicator() {
-    // FIXME aconway 2011-12-21: causes race condition? Restore.
-//     queue->getBroker()->getLinks().destroy(
-//         link->getHost(), link->getPort(), queue->getName(), getName(), string());
+    queue->getBroker()->getLinks().destroy(
+        link->getHost(), link->getPort(), queue->getName(), getName(), string());
 }
 
 // Called in a broker connection thread when the bridge is created.
-void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler)
-{
+// shared_ptr to self is just to ensure we are still in memory.
+void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
     framing::FieldTable settings;
@@ -107,6 +113,8 @@ void QueueReplicator::initializeBridge(B
     peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
     peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
     QPID_LOG(debug, *this << "Activated bridge from " << args.i_src << " to " << args.i_dest);
+    // Reset self reference so this will be deleted when all external refs are gone.
+    self.reset();
 }
 
 namespace {

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1221921&r1=1221920&r2=1221921&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h Wed Dec 21 22:35:22 2011
@@ -23,6 +23,7 @@
  */
 #include "qpid/broker/Exchange.h"
 #include "qpid/framing/SequenceSet.h"
+#include <boost/enable_shared_from_this.hpp>
 #include <iosfwd>
 
 namespace qpid {
@@ -47,7 +48,8 @@ namespace ha {
  *
  * THREAD UNSAFE: Only called in the connection thread of the source queue.
  */
-class QueueReplicator : public broker::Exchange
+class QueueReplicator : public broker::Exchange,
+                        public boost::enable_shared_from_this<QueueReplicator>
 {
   public:
     static const std::string DEQUEUE_EVENT_KEY;
@@ -57,6 +59,8 @@ class QueueReplicator : public broker::E
     QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
     ~QueueReplicator();
 
+    void activate();
+
     std::string getType() const;
     bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
     bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
@@ -70,7 +74,7 @@ class QueueReplicator : public broker::E
     sys::Mutex lock;
     boost::shared_ptr<broker::Queue> queue;
     boost::shared_ptr<broker::Link> link;
-
+    boost::shared_ptr<QueueReplicator> self;
   friend std::ostream& operator<<(std::ostream&, const QueueReplicator&);
 };
 

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1221921&r1=1221920&r2=1221921&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Wed Dec 21 22:35:22 2011
@@ -295,9 +295,8 @@ void WiringReplicator::doEventQueueDelet
             name,
             values[USER].asString(),
             values[RHOST].asString());
-        // FIXME aconway 2011-12-21: casuses race conditions? Restore.
-//         // Also delete the QueueReplicator exchange for this queue.
-//         broker.getExchanges().destroy(QueueReplicator::replicatorName(name));
+        // Delete the QueueReplicator exchange for this queue.
+        broker.getExchanges().destroy(QueueReplicator::replicatorName(name));
     }
 }
 
@@ -449,9 +448,9 @@ void WiringReplicator::doResponseBind(Va
 }
 
 void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
-    // FIXME aconway 2011-11-28: also need to remove these when queue is destroyed.
     if (replicateLevel(queue->getSettings()) == RL_ALL) {
         boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+        qr->activate();
         broker.getExchanges().registerExchange(qr);
     }
 }

Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/qpid-cluster-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/qpid-cluster-benchmark?rev=1221921&r1=1221920&r2=1221921&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/tests/qpid-cluster-benchmark (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/tests/qpid-cluster-benchmark Wed Dec 21 22:35:22 2011
@@ -30,7 +30,7 @@ RECEIVERS="-r 3"
 BROKERS=			# Local broker
 CLIENT_HOSTS=			# No ssh, all clients are local
 
-while getopts "m:f:n:b:q:s:r:c:txyv" opt; do
+while getopts "m:f:n:b:q:s:r:c:txyv-" opt; do
     case $opt in
 	m) MESSAGES="-m $OPTARG";;
 	f) FLOW="--flow-control $OPTARG";;
@@ -44,15 +44,16 @@ while getopts "m:f:n:b:q:s:r:c:txyv" opt
 	x) SAVE_RECEIVED="--save-received";;
 	y) NO_DELETE="--no-delete";;
 	v) OPTS="--verbose";;
+	-) break ;;
 	*) echo "Unknown option"; exit 1;;
     esac
 done
+shift $(($OPTIND-1))
+
 REPLICATE="node:{x-declare:{arguments:{'qpid.replicate':all}}}"
 BROKER=$(echo $BROKERS | sed s/,.*//)
 run_test() { echo $*; shift; "$@"; echo; echo; echo; }
 
 OPTS="$OPTS $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $TCP_NODELAY $NO_DELETE"
 OPTS="$OPTS --create-option $REPLICATE"
-run_test "Benchmark:" qpid-cpp-benchmark $OPTS
-
-
+run_test "Benchmark:" qpid-cpp-benchmark $OPTS "$@"



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org