You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2014/04/07 23:22:32 UTC

svn commit: r1585587 - in /qpid/trunk/qpid/cpp/src: qpid/broker/Consumer.h qpid/broker/Queue.cpp qpid/broker/Queue.h qpid/broker/SemanticState.cpp qpid/broker/SemanticState.h qpid/broker/SessionAdapter.cpp qpid/broker/amqp/Outgoing.cpp tests/QueueTest.cpp

Author: aconway
Date: Mon Apr  7 21:22:32 2014
New Revision: 1585587

URL: http://svn.apache.org/r1585587
Log:
QPID-5667: C++ broker: QMF subscribe events are not raised with AMQP 1.0

The raise event logic for subscribe/unsubscribe events was in 0-10 specific code.
Moved it into Queue.cpp so events are generated regardless of protocol.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
    qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=1585587&r1=1585586&r2=1585587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Mon Apr  7 21:22:32 2014
@@ -49,8 +49,8 @@ class Consumer : public QueueCursor {
  public:
     typedef boost::shared_ptr<Consumer> shared_ptr;
 
-    Consumer(const std::string& _name, SubscriptionType type)
-        : QueueCursor(type), acquires(type == CONSUMER), inListeners(false), name(_name) {}
+    Consumer(const std::string& _name, SubscriptionType type, const std::string& _tag)
+        : QueueCursor(type), acquires(type == CONSUMER), inListeners(false), name(_name), tag(_tag) {}
     virtual ~Consumer(){}
 
     bool preAcquires() const { return acquires; }
@@ -88,8 +88,12 @@ class Consumer : public QueueCursor {
 
     QueueCursor getCursor() const { return *this; }
     void setCursor(const QueueCursor& qc) { static_cast<QueueCursor&>(*this) = qc; }
+
+    const std::string& getTag() const { return tag; }
+
   protected:
     //framing::SequenceNumber position;
+    const std::string tag;  // <destination> from AMQP 0-10 Message.subscribe command
 
   private:
     friend class QueueListeners;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1585587&r1=1585586&r2=1585587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Apr  7 21:22:32 2014
@@ -53,6 +53,8 @@
 #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
 #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
 #include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
+#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
+#include "qmf/org/apache/qpid/broker/EventUnsubscribe.h"
 
 #include <iostream>
 #include <algorithm>
@@ -534,7 +536,9 @@ void Queue::releaseFromUse(bool controll
     if (trydelete) scheduleAutoDelete();
 }
 
-void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive,
+                    const framing::FieldTable& arguments,
+                    const std::string& connectionId, const std::string& userId)
 {
     {
         Mutex::ScopedLock locker(messageLock);
@@ -573,9 +577,15 @@ void Queue::consume(Consumer::shared_ptr
     if (mgmtObject != 0 && c->isCounted()) {
         mgmtObject->inc_consumerCount();
     }
+    ManagementAgent* agent = broker->getManagementAgent();
+    if (agent) {
+        agent->raiseEvent(
+            _qmf::EventSubscribe(connectionId, userId, name,
+                                 c->getTag(), requestExclusive, ManagementAgent::toMap(arguments)));
+    }
 }
 
-void Queue::cancel(Consumer::shared_ptr c)
+void Queue::cancel(Consumer::shared_ptr c, const std::string& connectionId, const std::string& userId)
 {
     removeListener(c);
     if(c->isCounted())
@@ -599,6 +609,8 @@ void Queue::cancel(Consumer::shared_ptr 
             scheduleAutoDelete();
         }
     }
+    ManagementAgent* agent = broker->getManagementAgent();
+    if (agent) agent->raiseEvent(_qmf::EventUnsubscribe(connectionId, userId, c->getTag()));
 }
 
 /**

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1585587&r1=1585586&r2=1585587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Apr  7 21:22:32 2014
@@ -348,8 +348,14 @@ class Queue : public boost::enable_share
     QPID_BROKER_EXTERN void recover(Message& msg);
 
     QPID_BROKER_EXTERN void consume(Consumer::shared_ptr c,
-                                    bool exclusive = false);
-    QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
+                                    bool exclusive = false,
+                                    const framing::FieldTable& arguments = framing::FieldTable(),
+                                    const std::string& connectionId=std::string(),
+                                    const std::string& userId=std::string());
+
+    QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c,
+                                    const std::string& connectionId=std::string(),
+                                    const std::string& userId=std::string());
     /**
      * Used to indicate that the queue is being used in some other
      * context than by a subscriber. The controlling flag should only

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1585587&r1=1585586&r2=1585587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Apr  7 21:22:32 2014
@@ -141,7 +141,7 @@ void SemanticState::consume(const string
         c = ConsumerImpl::shared_ptr(
             new ConsumerImpl(this, name, queue, ackRequired, acquire ? CONSUMER : BROWSER, exclusive, tag,
                              resumeId, resumeTtl, arguments));
-    queue->consume(c, exclusive);//may throw exception
+    queue->consume(c, exclusive, arguments, connectionId, userID);//may throw exception
     consumers[tag] = c;
 }
 
@@ -323,7 +323,7 @@ SemanticStateConsumerImpl::SemanticState
                                           const framing::FieldTable& _arguments
 
 ) :
-Consumer(_name, type),
+    Consumer(_name, type, _tag),
     parent(_parent),
     queue(_queue),
     ackExpected(ack),
@@ -331,7 +331,6 @@ Consumer(_name, type),
     blocked(true),
     exclusive(_exclusive),
     resumeId(_resumeId),
-    tag(_tag),
     selector(returnSelector(_arguments.getAsString(APACHE_SELECTOR))),
     resumeTtl(_resumeTtl),
     arguments(_arguments),
@@ -472,7 +471,7 @@ void SemanticState::cancel(ConsumerImpl:
     disable(c);
     Queue::shared_ptr queue = c->getQueue();
     if(queue) {
-        queue->cancel(c);
+        queue->cancel(c, connectionId, userID);
     }
     c->cancel();
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1585587&r1=1585586&r2=1585587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Mon Apr  7 21:22:32 2014
@@ -209,7 +209,6 @@ class SemanticStateConsumerImpl : public
     bool blocked;
     bool exclusive;
     std::string resumeId;
-    const std::string tag;  // <destination> from AMQP 0-10 Message.subscribe command
     boost::shared_ptr<Selector> selector;
     uint64_t resumeTtl;
     framing::FieldTable arguments;
@@ -270,7 +269,6 @@ class SemanticStateConsumerImpl : public
     bool isAcquire() const { return acquire; }
     bool isExclusive() const { return exclusive; }
     std::string getResumeId() const { return resumeId; };
-    const std::string& getTag() const { return tag; }
     uint64_t getResumeTtl() const { return resumeTtl; }
     uint32_t getDeliveryCount() const { return deliveryCount; }
     void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1585587&r1=1585586&r2=1585587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Apr  7 21:22:32 2014
@@ -29,15 +29,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/broker/SessionState.h"
-#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
-#include "qmf/org/apache/qpid/broker/EventBind.h"
-#include "qmf/org/apache/qpid/broker/EventUnbind.h"
-#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
-#include "qmf/org/apache/qpid/broker/EventUnsubscribe.h"
-#include <boost/format.hpp>
+ #include <boost/format.hpp>
 #include <boost/cast.hpp>
 #include <boost/bind.hpp>
 
@@ -50,7 +42,6 @@ using namespace qpid;
 using namespace qpid::framing;
 using namespace qpid::framing::dtx;
 using namespace qpid::management;
-namespace _qmf = qmf::org::apache::qpid::broker;
 
 typedef std::vector<Queue::shared_ptr> QueueVector;
 
@@ -425,10 +416,6 @@ SessionAdapter::MessageHandlerImpl::subs
                   acceptMode == 0, acquireMode == 0, exclusive,
                   resumeId, resumeTtl, arguments);
 
-    ManagementAgent* agent = getBroker().getManagementAgent();
-    if (agent)
-        agent->raiseEvent(_qmf::EventSubscribe(getConnection().getMgmtId(), getConnection().getUserId(),
-                                               queueName, destination, exclusive, ManagementAgent::toMap(arguments)));
     QPID_LOG_CAT(debug, model, "Create subscription. queue:" << queueName
         << " destination:" << destination
         << " user:" << getConnection().getUserId()
@@ -443,10 +430,6 @@ SessionAdapter::MessageHandlerImpl::canc
     if (!state.cancel(destination)) {
         throw NotFoundException(QPID_MSG("No such subscription: " << destination));
     }
-
-    ManagementAgent* agent = getBroker().getManagementAgent();
-    if (agent)
-        agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getMgmtId(), getConnection().getUserId(), destination));
     QPID_LOG_CAT(debug, model, "Delete subscription. destination:" << destination
         << " user:" << getConnection().getUserId()
         << " rhost:" << getConnection().getMgmtId() );

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1585587&r1=1585586&r2=1585587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Mon Apr  7 21:22:32 2014
@@ -59,7 +59,7 @@ bool requested_unreliable(pn_link_t* lin
 OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session,
                                      qpid::sys::OutputControl& o, SubscriptionType type, bool e, bool p)
     : Outgoing(broker, session, source, target, pn_link_name(l)),
-      Consumer(pn_link_name(l), type),
+      Consumer(pn_link_name(l), type, target),
       exclusive(e),
       isControllingUser(p),
       queue(q), deliveries(5000), link(l), out(o),

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1585587&r1=1585586&r2=1585587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Mon Apr  7 21:22:32 2014
@@ -64,7 +64,7 @@ public:
     QueueCursor lastCursor;
     Message lastMessage;
     bool received;
-    TestConsumer(std::string name="test", bool acquire = true) : Consumer(name, acquire ? CONSUMER : BROWSER), received(false) {};
+    TestConsumer(std::string name="test", bool acquire = true) : Consumer(name, acquire ? CONSUMER : BROWSER, ""), received(false) {};
 
     virtual bool deliver(const QueueCursor& cursor, const Message& message){
         lastCursor = cursor;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org