You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/07/02 20:09:13 UTC
svn commit: r790698 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Bridge.cpp
Bridge.h Connection.cpp Connection.h LinkRegistry.cpp LinkRegistry.h
Author: tross
Date: Thu Jul 2 18:09:12 2009
New Revision: 790698
URL: http://svn.apache.org/viewvc?rev=790698&view=rev
Log:
Federation: Propagation of dynamic bindings is now done on the thread servicing the
federation link (connection).
Also, some minor cleanup of unneeded recursive includes.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=790698&r1=790697&r2=790698&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Jul 2 18:09:12 2009
@@ -21,11 +21,11 @@
#include "Bridge.h"
#include "ConnectionState.h"
#include "Connection.h"
+#include "Link.h"
#include "LinkRegistry.h"
#include "SessionState.h"
#include "qpid/management/ManagementAgent.h"
-#include "qpid/framing/FieldTable.h"
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include <iostream>
@@ -84,6 +84,7 @@
void Bridge::create(Connection& c)
{
connState = &c;
+ conn = &c;
FieldTable options;
if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
SessionHandler& sessionHandler = c.getChannel(id);
@@ -288,7 +289,8 @@
else
bindArgs.setString(qpidFedOrigin, origin);
- peer->getExchange().bind(queueName, args.i_src, key, bindArgs);
+ conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
+ queueName, args.i_src, key, bindArgs));
}
}
@@ -299,7 +301,13 @@
bindArgs.setString(qpidFedOp, fedOpReorigin);
bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag());
- peer->getExchange().bind(queueName, args.i_src, args.i_key, bindArgs);
+ conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
+ queueName, args.i_src, args.i_key, bindArgs));
+}
+
+void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchange, const string& key, FieldTable args)
+{
+ peer->getExchange().bind(queue, exchange, key, args);
}
bool Bridge::containsLocalTag(const string& tagList) const
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=790698&r1=790697&r2=790698&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Thu Jul 2 18:09:12 2009
@@ -26,6 +26,7 @@
#include "qpid/framing/ChannelHandler.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
#include "Exchange.h"
#include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h"
@@ -74,6 +75,7 @@
// Exchange::DynamicBridge methods
void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin);
void sendReorigin();
+ void ioThreadPropagateBinding(const string& queue, const string& exchange, const string& key, framing::FieldTable args);
bool containsLocalTag(const std::string& tagList) const;
const std::string& getLocalTag() const;
@@ -98,6 +100,7 @@
std::string queueName;
mutable uint64_t persistenceId;
ConnectionState* connState;
+ Connection* conn;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=790698&r1=790697&r2=790698&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Thu Jul 2 18:09:12 2009
@@ -21,6 +21,7 @@
#include "Connection.h"
#include "SessionState.h"
#include "Bridge.h"
+#include "Broker.h"
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
@@ -268,13 +269,15 @@
bool Connection::hasOutput() { return outputTasks.hasOutput(); }
bool Connection::doOutput() {
- try{
+ try {
{
- ScopedLock<Mutex> l(ioCallbackLock);
- while (!ioCallbacks.empty()) {
- ioCallbacks.front()(); // Lend the IO thread for management processing
- ioCallbacks.pop();
- }
+ ScopedLock<Mutex> l(ioCallbackLock);
+ while (!ioCallbacks.empty()) {
+ boost::function0<void> cb = ioCallbacks.front();
+ ioCallbacks.pop();
+ ScopedUnlock<Mutex> ul(ioCallbackLock);
+ cb(); // Lend the IO thread for management processing
+ }
}
if (mgmtClosing)
close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request");
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=790698&r1=790697&r2=790698&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Thu Jul 2 18:09:12 2009
@@ -29,7 +29,6 @@
#include <boost/ptr_container/ptr_map.hpp>
-#include "Broker.h"
#include "ConnectionHandler.h"
#include "ConnectionState.h"
#include "SessionHandler.h"
@@ -58,6 +57,7 @@
namespace qpid {
namespace broker {
+class Broker;
class LinkRegistry;
class SecureConnection;
struct ConnectionTimeoutTask;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=790698&r1=790697&r2=790698&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Thu Jul 2 18:09:12 2009
@@ -19,6 +19,7 @@
*
*/
#include "LinkRegistry.h"
+#include "Link.h"
#include "Connection.h"
#include "qpid/log/Statement.h"
#include <iostream>
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=790698&r1=790697&r2=790698&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h Thu Jul 2 18:09:12 2009
@@ -23,17 +23,18 @@
*/
#include <map>
-#include "Link.h"
#include "Bridge.h"
#include "MessageStore.h"
#include "Timer.h"
#include "qpid/Address.h"
#include "qpid/sys/Mutex.h"
#include "qpid/management/Manageable.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
+ class Link;
class Broker;
class Connection;
class LinkRegistry {
@@ -49,7 +50,7 @@
void fire();
};
- typedef std::map<std::string, Link::shared_ptr> LinkMap;
+ typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap;
typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
typedef std::map<std::string, TcpAddress> AddressMap;
@@ -70,12 +71,12 @@
void periodicMaintenance ();
bool updateAddress(const std::string& oldKey, const TcpAddress& newAddress);
- Link::shared_ptr findLink(const std::string& key);
+ boost::shared_ptr<Link> findLink(const std::string& key);
static std::string createKey(const TcpAddress& address);
public:
LinkRegistry (Broker* _broker);
- std::pair<Link::shared_ptr, bool>
+ std::pair<boost::shared_ptr<Link>, bool>
declare(std::string& host,
uint16_t port,
std::string& transport,
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org