You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2010/04/08 19:58:26 UTC
svn commit: r932032 - in /qpid/trunk/qpid/cpp/src/qpid/broker:
DirectExchange.cpp FanOutExchange.cpp HeadersExchange.cpp TopicExchange.cpp
Author: kgiusti
Date: Thu Apr 8 17:58:26 2010
New Revision: 932032
URL: http://svn.apache.org/viewvc?rev=932032&view=rev
Log:
QPID-2487: always save the origin, even if queue is already bound
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=932032&r1=932031&r2=932032&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Thu Apr 8 17:58:26 2010
@@ -76,6 +76,9 @@ bool DirectExchange::bind(Queue::shared_
BoundKey& bk = bindings[routingKey];
if (exclusiveBinding) bk.queues.clear();
+ QPID_LOG(debug, "Bind key [" << routingKey << "] to queue " << queue->getName()
+ << " (origin=" << fedOrigin << ")");
+
if (bk.queues.add_unless(b, MatchQueue(queue))) {
b->startManagement();
propagate = bk.fedBinding.addOrigin(fedOrigin);
@@ -83,11 +86,17 @@ bool DirectExchange::bind(Queue::shared_
mgmtExchange->inc_bindingCount();
}
} else {
+ // queue already present - still need to track fedOrigin
+ bk.fedBinding.addOrigin(fedOrigin);
return false;
}
} else if (fedOp == fedOpUnbind) {
Mutex::ScopedLock l(lock);
BoundKey& bk = bindings[routingKey];
+
+ QPID_LOG(debug, "Bind - fedOpUnbind key [" << routingKey << "] queue " << queue->getName()
+ << " (origin=" << fedOrigin << ")");
+
propagate = bk.fedBinding.delOrigin(fedOrigin);
if (bk.fedBinding.count() == 0)
unbind(queue, routingKey, 0);
@@ -123,6 +132,8 @@ bool DirectExchange::unbind(Queue::share
{
bool propagate = false;
+ QPID_LOG(debug, "Unbind key [" << routingKey << "] from queue " << queue->getName());
+
{
Mutex::ScopedLock l(lock);
BoundKey& bk = bindings[routingKey];
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=932032&r1=932031&r2=932032&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Thu Apr 8 17:58:26 2010
@@ -69,6 +69,8 @@ bool FanOutExchange::bind(Queue::shared_
mgmtExchange->inc_bindingCount();
}
} else {
+ // queue already present - still need to track fedOrigin
+ fedBinding.addOrigin(fedOrigin);
return false;
}
} else if (fedOp == fedOpUnbind) {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=932032&r1=932031&r2=932032&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Thu Apr 8 17:58:26 2010
@@ -120,6 +120,7 @@ bool HeadersExchange::bind(Queue::shared
mgmtExchange->inc_bindingCount();
}
} else {
+ bk.fedBinding.addOrigin(fedOrigin);
return false;
}
} // lock dropped
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=932032&r1=932031&r2=932032&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Thu Apr 8 17:58:26 2010
@@ -205,6 +205,9 @@ bool TopicExchange::bind(Queue::shared_p
if (args == 0 || fedOp.empty() || fedOp == fedOpBind) {
RWlock::ScopedWlock l(lock);
if (isBound(queue, routingPattern)) {
+ // already bound, but may be from a different fedOrigin
+ BoundKey& bk = bindings[routingPattern];
+ bk.fedBinding.addOrigin(fedOrigin);
return false;
} else {
Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin));
@@ -215,7 +218,7 @@ bool TopicExchange::bind(Queue::shared_p
if (mgmtExchange != 0) {
mgmtExchange->inc_bindingCount();
}
- QPID_LOG(debug, "Bound [" << routingPattern << "] to queue " << queue->getName()
+ QPID_LOG(debug, "Bound key [" << routingPattern << "] to queue " << queue->getName()
<< " (origin=" << fedOrigin << ")");
}
} else if (fedOp == fedOpUnbind) {
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org