You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2009/11/13 20:30:07 UTC
svn commit: r835962 - in /qpid/trunk/qpid:
cpp/src/qpid/broker/SemanticState.cpp cpp/src/qpid/broker/SemanticState.h
specs/management-schema.xml
Author: cctrieloff
Date: Fri Nov 13 19:30:07 2009
New Revision: 835962
URL: http://svn.apache.org/viewvc?rev=835962&view=rev
Log:
Add management subscription object
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
qpid/trunk/qpid/specs/management-schema.xml
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=835962&r1=835961&r2=835962&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Nov 13 19:30:07 2009
@@ -58,6 +58,11 @@
using namespace qpid::framing;
using namespace qpid::sys;
using qpid::ptr_map_ptr;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
+namespace _qmf = qmf::org::apache::qpid::broker;
SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
: session(ss),
@@ -261,8 +266,38 @@
byteCredit(0),
notifyEnabled(true),
syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
- deliveryCount(0)
-{}
+ deliveryCount(0),
+ mgmtObject(0)
+{
+ if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0)
+ {
+ ManagementAgent* agent = parent->session.getBroker().getManagementAgent();
+ qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session));
+
+ if (agent != 0)
+ {
+ mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name ,arguments,
+ acquire, ackExpected, syncFrequency, resumeId, resumeTtl, exclusive);
+ agent->addObject (mgmtObject, agent->allocateId(this));
+ mgmtObject->set_mode("WINDOW");
+ }
+ }
+}
+
+ManagementObject* SemanticState::ConsumerImpl::GetManagementObject (void) const
+{
+ return (ManagementObject*) mgmtObject;
+}
+
+Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&)
+{
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+
+ QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
+
+ return status;
+}
+
OwnershipToken* SemanticState::ConsumerImpl::getSession()
{
@@ -283,6 +318,7 @@
if (acquire && !ackExpected) {
queue->dequeue(0, msg);
}
+ if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
}
@@ -299,6 +335,7 @@
// in future.
//
blocked = !(filter(msg) && checkCredit(msg));
+ if (mgmtObject && !blocked && acquire) { mgmtObject->inc_accepted(); }
return !blocked;
}
@@ -341,7 +378,11 @@
return enoughCredit;
}
-SemanticState::ConsumerImpl::~ConsumerImpl() {}
+SemanticState::ConsumerImpl::~ConsumerImpl()
+{
+ if (mgmtObject != 0)
+ mgmtObject->resourceDestroy ();
+}
void SemanticState::cancel(ConsumerImpl::shared_ptr c)
{
@@ -524,11 +565,17 @@
void SemanticState::ConsumerImpl::setWindowMode()
{
windowing = true;
+ if (mgmtObject){
+ mgmtObject->set_mode("WINDOW");
+ }
}
void SemanticState::ConsumerImpl::setCreditMode()
{
windowing = false;
+ if (mgmtObject){
+ mgmtObject->set_mode("CREDIT");
+ }
}
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=835962&r1=835961&r2=835962&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Fri Nov 13 19:30:07 2009
@@ -38,6 +38,7 @@
#include "qpid/sys/Mutex.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/broker/AclModule.h"
+#include "qmf/org/apache/qpid/broker/Subscription.h"
#include <list>
#include <map>
@@ -58,7 +59,8 @@
class SemanticState : private boost::noncopyable {
public:
class ConsumerImpl : public Consumer, public sys::OutputTask,
- public boost::enable_shared_from_this<ConsumerImpl>
+ public boost::enable_shared_from_this<ConsumerImpl>,
+ public management::Manageable
{
mutable qpid::sys::Mutex lock;
SemanticState* const parent;
@@ -77,6 +79,7 @@
bool notifyEnabled;
const int syncFrequency;
int deliveryCount;
+ qmf::org::apache::qpid::broker::Subscription* mgmtObject;
bool checkCredit(boost::intrusive_ptr<Message>& msg);
void allocateCredit(boost::intrusive_ptr<Message>& msg);
@@ -130,6 +133,9 @@
SemanticState& getParent() { return *parent; }
const SemanticState& getParent() const { return *parent; }
+ // Manageable entry points
+ management::ManagementObject* GetManagementObject (void) const;
+ management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
};
private:
Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=835962&r1=835961&r2=835962&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Fri Nov 13 19:30:07 2009
@@ -200,6 +200,27 @@
<!--
===============================================================
+ Subscription
+ ===============================================================
+ -->
+ <class name="Subscription">
+ <property name="sessionRef" type="objId" references="Exchange" access="RC" index="y" parentRef="y"/>
+ <property name="queueRef" type="objId" references="Session" access="RC" index="y"/>
+ <property name="name" type="sstr" access="RC" index="y"/>
+ <property name="arguments" type="map" access="RC"/>
+ <property name="acquire" type="bool" access="RC"/>
+ <property name="ackExpected" type="bool" access="RC"/>
+ <property name="syncFrequency" type="uint32" access="RC"/>
+ <property name="resumeID" type="sstr" access="RC"/>
+ <property name="resumeTTL" type="uint64" access="RC"/>
+ <property name="exclusive" type="bool" access="RC"/>
+ <property name="mode" type="sstr" access="RO" desc="WINDOW or CREDIT"/>
+ <statistic name="delivered" type="count64" unit="message" desc="Messages delivered"/>
+ <statistic name="accepted" type="count64" unit="message" desc="Messages accepted"/>
+ </class>
+
+ <!--
+ ===============================================================
Connection
===============================================================
-->
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org