You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/08 04:46:45 UTC

svn commit: r920189 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/management/

Author: tross
Date: Mon Mar  8 03:46:44 2010
New Revision: 920189

URL: http://svn.apache.org/viewvc?rev=920189&view=rev
Log:
Added hooks in the broker for QMFv2 management of the broker.
Now both DirectExchange and TopicExchange have been subclassed so messages can be
redirected to the embedded management agent (in QMFv1, only the topic exchange was
subclassed this way).

Added:
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.h
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp
      - copied, changed from r920168, qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.h
      - copied, changed from r920168, qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
Removed:
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h
Modified:
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=920189&r1=920188&r2=920189&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Mon Mar  8 03:46:44 2010
@@ -636,8 +636,10 @@
   qpid/management/IdAllocator.h \
   qpid/management/ManagementAgent.cpp \
   qpid/management/ManagementAgent.h \
-  qpid/management/ManagementExchange.cpp \
-  qpid/management/ManagementExchange.h \
+  qpid/management/ManagementDirectExchange.cpp \
+  qpid/management/ManagementDirectExchange.h \
+  qpid/management/ManagementTopicExchange.cpp \
+  qpid/management/ManagementTopicExchange.h \
   qpid/sys/TCPIOPlugin.cpp
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=920189&r1=920188&r2=920189&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Mar  8 03:46:44 2010
@@ -35,7 +35,8 @@
 #include "qmf/org/apache/qpid/broker/Package.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
 #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h"
-#include "qpid/management/ManagementExchange.h"
+#include "qpid/management/ManagementDirectExchange.h"
+#include "qpid/management/ManagementTopicExchange.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/ProtocolInitiation.h"
@@ -234,11 +235,22 @@
     declareStandardExchange(amq_match, HeadersExchange::typeName);
 
     if(conf.enableMgmt) {
-        exchanges.declare(qpid_management, ManagementExchange::typeName);
-        Exchange::shared_ptr mExchange = exchanges.get (qpid_management);
-        Exchange::shared_ptr dExchange = exchanges.get (amq_direct);
+        exchanges.declare(qpid_management, ManagementTopicExchange::typeName);
+        Exchange::shared_ptr mExchange = exchanges.get(qpid_management);
+        Exchange::shared_ptr dExchange = exchanges.get(amq_direct);
         managementAgent->setExchange(mExchange, dExchange);
-        boost::dynamic_pointer_cast<ManagementExchange>(mExchange)->setManagmentAgent(managementAgent.get());
+        boost::dynamic_pointer_cast<ManagementTopicExchange>(mExchange)->setManagmentAgent(managementAgent.get(), 1);
+
+        std::string qmfTopic("qmf.default.topic");
+        std::string qmfDirect("qmf.default.direct");
+
+        std::pair<Exchange::shared_ptr, bool> topicPair(exchanges.declare(qmfTopic, ManagementTopicExchange::typeName));
+        std::pair<Exchange::shared_ptr, bool> directPair(exchanges.declare(qmfDirect, ManagementDirectExchange::typeName));
+
+        boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2);
+        boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2);
+
+        managementAgent->setExchangeV2(topicPair.first, directPair.first);
     }
     else
         QPID_LOG(info, "Management not enabled");

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=920189&r1=920188&r2=920189&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Mon Mar  8 03:46:44 2010
@@ -24,7 +24,8 @@
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/HeadersExchange.h"
 #include "qpid/broker/TopicExchange.h"
-#include "qpid/management/ManagementExchange.h"
+#include "qpid/management/ManagementDirectExchange.h"
+#include "qpid/management/ManagementTopicExchange.h"
 #include "qpid/framing/reply_exceptions.h"
 
 using namespace qpid::broker;
@@ -52,8 +53,10 @@
             exchange = Exchange::shared_ptr(new FanOutExchange(name, durable, args, parent, broker));
         }else if (type == HeadersExchange::typeName) {
             exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent, broker));
-        }else if (type == ManagementExchange::typeName) {
-            exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent, broker));
+        }else if (type == ManagementDirectExchange::typeName) {
+            exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, durable, args, parent, broker));
+        }else if (type == ManagementTopicExchange::typeName) {
+            exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, durable, args, parent, broker));
         }else{
             FunctionMap::iterator i =  factory.find(type);
             if (i == factory.end()) {

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=920189&r1=920188&r2=920189&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Mon Mar  8 03:46:44 2010
@@ -71,11 +71,13 @@
         Mutex::ScopedLock lock (userLock);
 
         // Reset the shared pointers to exchanges.  If this is not done now, the exchanges
-        // will stick around until dExchange and mExchange are implicitely destroyed (long
+        // will stick around until dExchange and mExchange are implicitly destroyed (long
         // after this destructor completes).  Those exchanges hold references to management
         // objects that will be invalid.
         dExchange.reset();
         mExchange.reset();
+        v2Topic.reset();
+        v2Direct.reset();
 
         moveNewObjectsLH();
         for (ManagementObjectMap::iterator iter = managementObjects.begin ();
@@ -158,13 +160,20 @@
     }
 }
 
-void ManagementAgent::setExchange (qpid::broker::Exchange::shared_ptr _mexchange,
-                                   qpid::broker::Exchange::shared_ptr _dexchange)
+void ManagementAgent::setExchange(qpid::broker::Exchange::shared_ptr _mexchange,
+                                  qpid::broker::Exchange::shared_ptr _dexchange)
 {
     mExchange = _mexchange;
     dExchange = _dexchange;
 }
 
+void ManagementAgent::setExchangeV2(qpid::broker::Exchange::shared_ptr _texchange,
+                                    qpid::broker::Exchange::shared_ptr _dexchange)
+{
+    v2Topic = _texchange;
+    v2Direct = _dexchange;
+}
+
 void ManagementAgent::registerClass (const string&  packageName,
                                      const string&  className,
                                      uint8_t* md5Sum,

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=920189&r1=920188&r2=920189&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Mon Mar  8 03:46:44 2010
@@ -74,9 +74,12 @@
     /** Called by cluster to suppress management output during update. */
     void suppress(bool s) { suppressed = s; }
 
-    void setInterval     (uint16_t _interval) { interval = _interval; }
-    void setExchange     (qpid::broker::Exchange::shared_ptr mgmtExchange,
-                          qpid::broker::Exchange::shared_ptr directExchange);
+    void setInterval(uint16_t _interval) { interval = _interval; }
+    void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange,
+                     qpid::broker::Exchange::shared_ptr directExchange);
+    void setExchangeV2(qpid::broker::Exchange::shared_ptr topicExchange,
+                       qpid::broker::Exchange::shared_ptr directExchange);
+
     int  getMaxThreads   () { return threadPoolSize; }
     QPID_BROKER_EXTERN void registerClass   (const std::string& packageName,
                                              const std::string& className,
@@ -240,6 +243,8 @@
 
     qpid::broker::Exchange::shared_ptr mExchange;
     qpid::broker::Exchange::shared_ptr dExchange;
+    qpid::broker::Exchange::shared_ptr v2Topic;
+    qpid::broker::Exchange::shared_ptr v2Direct;
     std::string                  dataDir;
     uint16_t                     interval;
     qpid::broker::Broker*        broker;

Added: qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp?rev=920189&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.cpp Mon Mar  8 03:46:44 2010
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/management/ManagementDirectExchange.h"
+#include "qpid/log/Statement.h"
+#include <assert.h>
+
+using namespace qpid::management;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+ManagementDirectExchange::ManagementDirectExchange(const string& _name, Manageable* _parent, Broker* b) :
+    Exchange (_name, _parent, b), DirectExchange(_name, _parent, b) {}
+ManagementDirectExchange::ManagementDirectExchange(const std::string& _name,
+                                                   bool               _durable,
+                                                   const FieldTable&  _args,
+                                                   Manageable*        _parent, Broker* b) :
+    Exchange (_name, _durable, _args, _parent, b), 
+    DirectExchange(_name, _durable, _args, _parent, b) {}
+
+void ManagementDirectExchange::route(Deliverable&      msg,
+                                     const string&     routingKey,
+                                     const FieldTable* args)
+{
+    bool routeIt = true;
+
+    // TODO: Intercept messages directed to the embedded agent and send them to the management agent.
+
+    if (routeIt)
+        DirectExchange::route(msg, routingKey, args);
+}
+
+void ManagementDirectExchange::setManagmentAgent(ManagementAgent* agent, int qv)
+{
+    managementAgent = agent;
+    qmfVersion = qv;
+    assert(qmfVersion == 2); // QMFv1 doesn't use a specialized direct exchange
+}
+
+
+ManagementDirectExchange::~ManagementDirectExchange() {}
+
+const std::string ManagementDirectExchange::typeName("management-direct");
+

Added: qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.h?rev=920189&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementDirectExchange.h Mon Mar  8 03:46:44 2010
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ManagementDirectExchange_
+#define _ManagementDirectExchange_
+
+#include "qpid/broker/DirectExchange.h"
+#include "qpid/management/ManagementAgent.h"
+
+namespace qpid {
+namespace broker {
+
+class ManagementDirectExchange : public virtual DirectExchange
+{
+  private:
+    management::ManagementAgent* managementAgent;
+    int qmfVersion;
+ 
+  public:
+    static const std::string typeName;
+
+    ManagementDirectExchange(const string& name, Manageable* _parent = 0, Broker* broker = 0);
+    ManagementDirectExchange(const string& _name, bool _durable, 
+                             const qpid::framing::FieldTable& _args,
+                             Manageable* _parent = 0, Broker* broker = 0);
+
+    virtual std::string getType() const { return typeName; }
+
+    virtual void route(Deliverable& msg,
+                       const string& routingKey,
+                       const qpid::framing::FieldTable* args);
+
+    void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
+
+    virtual ~ManagementDirectExchange();
+};
+
+
+}
+}
+
+#endif

Copied: qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp (from r920168, qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp?p2=qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp&p1=qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp&r1=920168&r2=920189&rev=920189&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.cpp Mon Mar  8 03:46:44 2010
@@ -19,7 +19,7 @@
  *
  */
 
-#include "qpid/management/ManagementExchange.h"
+#include "qpid/management/ManagementTopicExchange.h"
 #include "qpid/log/Statement.h"
 
 using namespace qpid::management;
@@ -27,46 +27,50 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-ManagementExchange::ManagementExchange (const string& _name, Manageable* _parent, Broker* b) :
+ManagementTopicExchange::ManagementTopicExchange(const string& _name, Manageable* _parent, Broker* b) :
     Exchange (_name, _parent, b), TopicExchange(_name, _parent, b) {}
-ManagementExchange::ManagementExchange (const std::string& _name,
-                                        bool               _durable,
-                                        const FieldTable&  _args,
-                                        Manageable*        _parent, Broker* b) :
+ManagementTopicExchange::ManagementTopicExchange(const std::string& _name,
+                                                 bool               _durable,
+                                                 const FieldTable&  _args,
+                                                 Manageable*        _parent, Broker* b) :
     Exchange (_name, _durable, _args, _parent, b), 
     TopicExchange(_name, _durable, _args, _parent, b) {}
 
-void ManagementExchange::route (Deliverable&      msg,
-                                const string&     routingKey,
-                                const FieldTable* args)
+void ManagementTopicExchange::route(Deliverable&      msg,
+                                    const string&     routingKey,
+                                    const FieldTable* args)
 {
     bool routeIt = true;
 
     // Intercept management agent commands
-    if ((routingKey.length() > 6 &&
-         routingKey.substr(0, 6).compare("agent.") == 0) ||
-        (routingKey == "broker"))
-        routeIt = managementAgent->dispatchCommand(msg, routingKey, args);
+    if (qmfVersion == 1) {
+        if ((routingKey.length() > 6 &&
+             routingKey.substr(0, 6).compare("agent.") == 0) ||
+            (routingKey == "broker"))
+            routeIt = managementAgent->dispatchCommand(msg, routingKey, args);
+    }
 
     if (routeIt)
         TopicExchange::route(msg, routingKey, args);
 }
 
-bool ManagementExchange::bind (Queue::shared_ptr queue,
-                               const string& routingKey,
-                               const qpid::framing::FieldTable* args)
+bool ManagementTopicExchange::bind(Queue::shared_ptr queue,
+                                   const string& routingKey,
+                                   const qpid::framing::FieldTable* args)
 {
-    managementAgent->clientAdded(routingKey);
+    if (qmfVersion == 1)
+        managementAgent->clientAdded(routingKey);
     return TopicExchange::bind(queue, routingKey, args);
 }
 
-void ManagementExchange::setManagmentAgent (ManagementAgent* agent)
+void ManagementTopicExchange::setManagmentAgent(ManagementAgent* agent, int qv)
 {
     managementAgent = agent;
+    qmfVersion = qv;
 }
 
 
-ManagementExchange::~ManagementExchange() {}
+ManagementTopicExchange::~ManagementTopicExchange() {}
 
-const std::string ManagementExchange::typeName("management");
+const std::string ManagementTopicExchange::typeName("management-topic");
 

Copied: qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.h (from r920168, qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.h?p2=qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.h&p1=qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h&r1=920168&r2=920189&rev=920189&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementTopicExchange.h Mon Mar  8 03:46:44 2010
@@ -18,8 +18,8 @@
  * under the License.
  *
  */
-#ifndef _ManagementExchange_
-#define _ManagementExchange_
+#ifndef _ManagementTopicExchange_
+#define _ManagementTopicExchange_
 
 #include "qpid/broker/TopicExchange.h"
 #include "qpid/management/ManagementAgent.h"
@@ -27,32 +27,33 @@
 namespace qpid {
 namespace broker {
 
-class ManagementExchange : public virtual TopicExchange
+class ManagementTopicExchange : public virtual TopicExchange
 {
   private:
     management::ManagementAgent* managementAgent;
+    int qmfVersion;
  
   public:
     static const std::string typeName;
 
-    ManagementExchange (const string& name, Manageable* _parent = 0, Broker* broker = 0);
-    ManagementExchange (const string& _name, bool _durable, 
-                        const qpid::framing::FieldTable& _args,
-                        Manageable* _parent = 0, Broker* broker = 0);
+    ManagementTopicExchange(const string& name, Manageable* _parent = 0, Broker* broker = 0);
+    ManagementTopicExchange(const string& _name, bool _durable, 
+                            const qpid::framing::FieldTable& _args,
+                            Manageable* _parent = 0, Broker* broker = 0);
 
     virtual std::string getType() const { return typeName; }
 
-    virtual void route (Deliverable& msg,
-                        const string& routingKey,
-                        const qpid::framing::FieldTable* args);
-
-    virtual bool bind (Queue::shared_ptr queue,
+    virtual void route(Deliverable& msg,
                        const string& routingKey,
                        const qpid::framing::FieldTable* args);
 
-    void setManagmentAgent (management::ManagementAgent* agent);
+    virtual bool bind(Queue::shared_ptr queue,
+                      const string& routingKey,
+                      const qpid::framing::FieldTable* args);
+
+    void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
 
-    virtual ~ManagementExchange();
+    virtual ~ManagementTopicExchange();
 };
 
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org