You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2009/11/13 20:30:07 UTC

svn commit: r835962 - in /qpid/trunk/qpid: cpp/src/qpid/broker/SemanticState.cpp cpp/src/qpid/broker/SemanticState.h specs/management-schema.xml

Author: cctrieloff
Date: Fri Nov 13 19:30:07 2009
New Revision: 835962

URL: http://svn.apache.org/viewvc?rev=835962&view=rev
Log:
Add management subscription object

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/specs/management-schema.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=835962&r1=835961&r2=835962&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Nov 13 19:30:07 2009
@@ -58,6 +58,11 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 using qpid::ptr_map_ptr;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+namespace _qmf = qmf::org::apache::qpid::broker;
 
 SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
     : session(ss),
@@ -261,8 +266,38 @@
     byteCredit(0),
     notifyEnabled(true),
     syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
-    deliveryCount(0)
-{}
+    deliveryCount(0),
+    mgmtObject(0)
+{
+    if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0)
+    {
+        ManagementAgent* agent = parent->session.getBroker().getManagementAgent();
+        qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session));
+        
+        if (agent != 0)
+        {
+            mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name ,arguments,
+                acquire, ackExpected, syncFrequency, resumeId, resumeTtl, exclusive);
+            agent->addObject (mgmtObject, agent->allocateId(this));
+            mgmtObject->set_mode("WINDOW");
+        }
+    }
+}
+
+ManagementObject* SemanticState::ConsumerImpl::GetManagementObject (void) const
+{
+    return (ManagementObject*) mgmtObject;
+}
+
+Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&)
+{
+    Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+    QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
+
+    return status;
+}
+
 
 OwnershipToken* SemanticState::ConsumerImpl::getSession()
 {
@@ -283,6 +318,7 @@
     if (acquire && !ackExpected) {
         queue->dequeue(0, msg);
     }
+    if (mgmtObject) { mgmtObject->inc_delivered(); }
     return true;
 }
 
@@ -299,6 +335,7 @@
     // in future.
     // 
     blocked = !(filter(msg) && checkCredit(msg));
+    if (mgmtObject && !blocked && acquire) { mgmtObject->inc_accepted(); }
     return !blocked;
 }
 
@@ -341,7 +378,11 @@
     return enoughCredit;
 }
 
-SemanticState::ConsumerImpl::~ConsumerImpl() {}
+SemanticState::ConsumerImpl::~ConsumerImpl() 
+{
+    if (mgmtObject != 0)
+        mgmtObject->resourceDestroy ();
+}
 
 void SemanticState::cancel(ConsumerImpl::shared_ptr c)
 {
@@ -524,11 +565,17 @@
 void SemanticState::ConsumerImpl::setWindowMode()
 {
     windowing = true;
+    if (mgmtObject){
+        mgmtObject->set_mode("WINDOW");
+    }
 }
 
 void SemanticState::ConsumerImpl::setCreditMode()
 {
     windowing = false;
+    if (mgmtObject){
+        mgmtObject->set_mode("CREDIT");
+    }
 }
 
 void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=835962&r1=835961&r2=835962&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Fri Nov 13 19:30:07 2009
@@ -38,6 +38,7 @@
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/AtomicValue.h"
 #include "qpid/broker/AclModule.h"
+#include "qmf/org/apache/qpid/broker/Subscription.h"
 
 #include <list>
 #include <map>
@@ -58,7 +59,8 @@
 class SemanticState : private boost::noncopyable {
   public:
     class ConsumerImpl : public Consumer, public sys::OutputTask,
-                         public boost::enable_shared_from_this<ConsumerImpl>
+                         public boost::enable_shared_from_this<ConsumerImpl>,
+                         public management::Manageable
     {
         mutable qpid::sys::Mutex lock;
         SemanticState* const parent;
@@ -77,6 +79,7 @@
         bool notifyEnabled;
         const int syncFrequency;
         int deliveryCount;
+        qmf::org::apache::qpid::broker::Subscription* mgmtObject;
 
         bool checkCredit(boost::intrusive_ptr<Message>& msg);
         void allocateCredit(boost::intrusive_ptr<Message>& msg);
@@ -130,6 +133,9 @@
 
         SemanticState& getParent() { return *parent; }
         const SemanticState& getParent() const { return *parent; }
+        // Manageable entry points
+        management::ManagementObject* GetManagementObject (void) const;
+        management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
     };
 
   private:

Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=835962&r1=835961&r2=835962&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Fri Nov 13 19:30:07 2009
@@ -200,6 +200,27 @@
   
   <!--
   ===============================================================
+  Subscription
+  ===============================================================
+  -->
+  <class name="Subscription">
+    <property name="sessionRef"     type="objId"    references="Exchange" access="RC" index="y" parentRef="y"/>
+    <property name="queueRef"       type="objId"    references="Session"   access="RC" index="y"/>
+    <property name="name"           type="sstr"     access="RC" index="y"/>
+    <property name="arguments"      type="map"      access="RC"/>
+    <property name="acquire"        type="bool"     access="RC"/>
+    <property name="ackExpected"    type="bool"     access="RC"/>
+    <property name="syncFrequency"  type="uint32"   access="RC"/>
+    <property name="resumeID"       type="sstr"     access="RC"/>
+    <property name="resumeTTL"      type="uint64"   access="RC"/>
+    <property name="exclusive"      type="bool"     access="RC"/>
+    <property name="mode"           type="sstr"     access="RO" desc="WINDOW or CREDIT"/>
+    <statistic name="delivered"     type="count64"  unit="message" desc="Messages delivered"/>
+    <statistic name="accepted"      type="count64"  unit="message" desc="Messages accepted"/>
+  </class>
+  
+  <!--
+  ===============================================================
   Connection
   ===============================================================
   -->



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org