You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/07/02 20:09:13 UTC

svn commit: r790698 - in /qpid/trunk/qpid/cpp/src/qpid/broker: Bridge.cpp Bridge.h Connection.cpp Connection.h LinkRegistry.cpp LinkRegistry.h

Author: tross
Date: Thu Jul  2 18:09:12 2009
New Revision: 790698

URL: http://svn.apache.org/viewvc?rev=790698&view=rev
Log:
Federation: Propagation of dynamic bindings is now done on the thread servicing the
            federation link (connection).

Also, some minor cleanup of unneeded recursive includes.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=790698&r1=790697&r2=790698&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Jul  2 18:09:12 2009
@@ -21,11 +21,11 @@
 #include "Bridge.h"
 #include "ConnectionState.h"
 #include "Connection.h"
+#include "Link.h"
 #include "LinkRegistry.h"
 #include "SessionState.h"
 
 #include "qpid/management/ManagementAgent.h"
-#include "qpid/framing/FieldTable.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/log/Statement.h"
 #include <iostream>
@@ -84,6 +84,7 @@
 void Bridge::create(Connection& c)
 {
     connState = &c;
+    conn = &c;
     FieldTable options;
     if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
     SessionHandler& sessionHandler = c.getChannel(id);
@@ -288,7 +289,8 @@
          else
              bindArgs.setString(qpidFedOrigin, origin);
 
-         peer->getExchange().bind(queueName, args.i_src, key, bindArgs);
+         conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
+                                               queueName, args.i_src, key, bindArgs));
     }
 }
 
@@ -299,7 +301,13 @@
     bindArgs.setString(qpidFedOp, fedOpReorigin);
     bindArgs.setString(qpidFedTags, link->getBroker()->getFederationTag());
 
-    peer->getExchange().bind(queueName, args.i_src, args.i_key, bindArgs);
+    conn->requestIOProcessing(boost::bind(&Bridge::ioThreadPropagateBinding, this,
+                                          queueName, args.i_src, args.i_key, bindArgs));
+}
+
+void Bridge::ioThreadPropagateBinding(const string& queue, const string& exchange, const string& key, FieldTable args)
+{
+    peer->getExchange().bind(queue, exchange, key, args);
 }
 
 bool Bridge::containsLocalTag(const string& tagList) const

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=790698&r1=790697&r2=790698&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Thu Jul  2 18:09:12 2009
@@ -26,6 +26,7 @@
 #include "qpid/framing/ChannelHandler.h"
 #include "qpid/framing/Buffer.h"
 #include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/FieldTable.h"
 #include "qpid/management/Manageable.h"
 #include "Exchange.h"
 #include "qmf/org/apache/qpid/broker/ArgsLinkBridge.h"
@@ -74,6 +75,7 @@
     // Exchange::DynamicBridge methods
     void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin);
     void sendReorigin();
+    void ioThreadPropagateBinding(const string& queue, const string& exchange, const string& key, framing::FieldTable args);
     bool containsLocalTag(const std::string& tagList) const;
     const std::string& getLocalTag() const;
 
@@ -98,6 +100,7 @@
     std::string queueName;
     mutable uint64_t  persistenceId;
     ConnectionState* connState;
+    Connection* conn;
 };
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=790698&r1=790697&r2=790698&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Thu Jul  2 18:09:12 2009
@@ -21,6 +21,7 @@
 #include "Connection.h"
 #include "SessionState.h"
 #include "Bridge.h"
+#include "Broker.h"
 
 #include "qpid/log/Statement.h"
 #include "qpid/ptr_map.h"
@@ -268,13 +269,15 @@
 bool Connection::hasOutput() { return outputTasks.hasOutput(); }
 
 bool Connection::doOutput() {
-    try{
+    try {
         {
-        ScopedLock<Mutex> l(ioCallbackLock);
-        while (!ioCallbacks.empty()) {
-            ioCallbacks.front()(); // Lend the IO thread for management processing
-            ioCallbacks.pop();
-        }
+            ScopedLock<Mutex> l(ioCallbackLock);
+            while (!ioCallbacks.empty()) {
+                boost::function0<void> cb = ioCallbacks.front();
+                ioCallbacks.pop();
+                ScopedUnlock<Mutex> ul(ioCallbackLock);
+                cb(); // Lend the IO thread for management processing
+            }
         }
         if (mgmtClosing)
             close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by Management Request");

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=790698&r1=790697&r2=790698&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Thu Jul  2 18:09:12 2009
@@ -29,7 +29,6 @@
 
 #include <boost/ptr_container/ptr_map.hpp>
 
-#include "Broker.h"
 #include "ConnectionHandler.h"
 #include "ConnectionState.h"
 #include "SessionHandler.h"
@@ -58,6 +57,7 @@
 namespace qpid {
 namespace broker {
 
+class Broker;
 class LinkRegistry;
 class SecureConnection;
 struct ConnectionTimeoutTask;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=790698&r1=790697&r2=790698&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Thu Jul  2 18:09:12 2009
@@ -19,6 +19,7 @@
  *
  */
 #include "LinkRegistry.h"
+#include "Link.h"
 #include "Connection.h"
 #include "qpid/log/Statement.h"
 #include <iostream>

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=790698&r1=790697&r2=790698&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h Thu Jul  2 18:09:12 2009
@@ -23,17 +23,18 @@
  */
 
 #include <map>
-#include "Link.h"
 #include "Bridge.h"
 #include "MessageStore.h"
 #include "Timer.h"
 #include "qpid/Address.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/management/Manageable.h"
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
 namespace broker {
 
+    class Link;
     class Broker;
     class Connection;
     class LinkRegistry {
@@ -49,7 +50,7 @@
             void fire();
         };
 
-        typedef std::map<std::string, Link::shared_ptr> LinkMap;
+        typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap;
         typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
         typedef std::map<std::string, TcpAddress> AddressMap;
 
@@ -70,12 +71,12 @@
 
         void periodicMaintenance ();
         bool updateAddress(const std::string& oldKey, const TcpAddress& newAddress);
-        Link::shared_ptr findLink(const std::string& key);
+        boost::shared_ptr<Link> findLink(const std::string& key);
         static std::string createKey(const TcpAddress& address);
 
     public:
         LinkRegistry (Broker* _broker);
-        std::pair<Link::shared_ptr, bool>
+        std::pair<boost::shared_ptr<Link>, bool>
             declare(std::string& host,
                     uint16_t     port,
                     std::string& transport,



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org