You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/23 18:21:56 UTC

svn commit: r707406 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: broker/DeliveryRecord.cpp broker/DeliveryRecord.h broker/Queue.cpp broker/Queue.h broker/SemanticState.cpp broker/SemanticState.h client/FlowControl.h cluster/DumpClient.cpp log/Selector.h

Author: aconway
Date: Thu Oct 23 09:21:56 2008
New Revision: 707406

URL: http://svn.apache.org/viewvc?rev=707406&view=rev
Log:

Minor changes to provide access for cluster to replicate delivery records.

 - broker::Queue: find message by position, set position.
 - broker::SemanticState: make record() public, add eachUnacked(), fix typo "NotifyEnabld"
 - broker::DeliveryRecord: added more public accessors

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Thu Oct 23 09:21:56 2008
@@ -31,17 +31,16 @@
 using std::string;
 
 DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, 
-                               Queue::shared_ptr _queue, 
-                               const std::string _tag,
-                               bool _acquired, bool accepted, 
+                               const Queue::shared_ptr& _queue, 
+                               const std::string& _tag,
+                               bool _acquired,
+                               bool accepted, 
                                bool _windowing) : msg(_msg), 
                                                   queue(_queue), 
                                                   tag(_tag),
                                                   acquired(_acquired),
                                                   acceptExpected(!accepted),
                                                   cancelled(false),
-                                                  credit(msg.payload ? msg.payload->getRequiredCredit() : 0),
-                                                  size(msg.payload ? msg.payload->contentSize() : 0),
                                                   completed(false),
                                                   ended(accepted),
                                                   windowing(_windowing)
@@ -154,7 +153,7 @@
 
 uint32_t DeliveryRecord::getCredit() const
 {
-    return credit;
+    return msg.payload ? msg.payload->getRequiredCredit() : 0;
 }
 
 void DeliveryRecord::acquire(DeliveryIds& results) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Thu Oct 23 09:21:56 2008
@@ -46,17 +46,21 @@
     bool acquired;
     bool acceptExpected;
     bool cancelled;
-    const uint32_t credit;
-    const uint64_t size;
 
     bool completed;
     bool ended;
     const bool windowing;
 
   public:
-    DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, 
-                   const std::string tag,
-                   bool acquired, bool confirmed, bool windowing);
+    DeliveryRecord(
+        const QueuedMessage& msg,
+        const Queue::shared_ptr& queue, 
+        const std::string& tag,
+        bool acquired,
+        bool accepted,
+        bool windowing
+    );
+    
     bool matches(DeliveryId tag) const;
     bool matchOrAfter(DeliveryId tag) const;
     bool after(DeliveryId tag) const;
@@ -76,13 +80,21 @@
     bool isAcquired() const { return acquired; }
     bool isComplete() const { return completed; }
     bool isRedundant() const { return ended && (!windowing || completed); }
-
+    bool isCancelled() const { return cancelled; }
+    bool isAccepted() const { return !acceptExpected; }
+    bool isEnded() const { return ended; }
+    bool isWindowing() const { return windowing; }
+    
     uint32_t getCredit() const;
-    const std::string& getTag() const { return tag; } 
+    const std::string& getTag() const { return tag; }
 
     void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize);
     void setId(DeliveryId _id) { id = _id; }
 
+    const QueuedMessage& getMessage() const { return msg; }
+    framing::SequenceNumber getId() const { return id; }
+    Queue::shared_ptr getQueue() const { return queue; }
+
     friend bool operator<(const DeliveryRecord&, const DeliveryRecord&);         
     friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Oct 23 09:21:56 2008
@@ -365,6 +365,25 @@
     return false;
 }
 
+namespace {
+struct PositionEquals {
+    SequenceNumber pos;
+    PositionEquals(SequenceNumber p) : pos(p) {}
+    bool operator()(const QueuedMessage& msg) const { return msg.position == pos; }
+};
+}// namespace
+
+bool Queue::find(QueuedMessage& msg, SequenceNumber pos) const {
+    Mutex::ScopedLock locker(messageLock);
+    Messages::const_iterator i = std::find_if(messages.begin(), messages.end(), PositionEquals(pos));
+    if (i == messages.end())
+        return false;
+    else {
+        msg = *i;
+        return true;
+    }
+}
+
 void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
     Mutex::ScopedLock locker(consumerLock);
     if(exclusive) {
@@ -827,3 +846,11 @@
 
     return status;
 }
+
+void Queue::setPosition(SequenceNumber n) {
+    if (n <= sequence)
+        throw InvalidArgumentException(QPID_MSG("Invalid position " << n << " < " << sequence
+                                                      << " for queue " << name));
+    sequence = n;
+    --sequence;                 // Decrement so ++sequence will return n.
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Oct 23 09:21:56 2008
@@ -66,7 +66,7 @@
 
             typedef std::list<Consumer::shared_ptr> Listeners;
             typedef std::deque<QueuedMessage> Messages;
-			typedef std::map<string,QueuedMessage*> LVQ;
+            typedef std::map<string,QueuedMessage*> LVQ;
 
             const string name;
             const bool autodelete;
@@ -95,7 +95,7 @@
             boost::shared_ptr<Exchange> alternateExchange;
             framing::SequenceNumber sequence;
             qmf::org::apache::qpid::broker::Queue* mgmtObject;
-            RateTracker dequeueTracker;
+RateTracker dequeueTracker;
 
             void push(boost::intrusive_ptr<Message>& msg);
             void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -227,6 +227,13 @@
              */
             QueuedMessage get();
 
+            /** Get the message at position pos
+             *@param msg out parameter, assigned to the message found.
+             *@param pos position to search for.
+             *@return True if there is a message at pos, false otherwise.
+             */
+            bool find(QueuedMessage& msg, framing::SequenceNumber pos) const;
+
             const QueuePolicy* getPolicy();
 
             void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
@@ -264,6 +271,11 @@
 
             void popMsg(QueuedMessage& qmsg);
 
+            /** Set the position sequence number  for the next message on the queue.
+             * Must be >= the current sequence number.
+             * Used by cluster to replicate queues.
+             */
+            void setPosition(framing::SequenceNumber pos);
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Oct 23 09:21:56 2008
@@ -610,7 +610,7 @@
     notifyEnabled = false;
 }
 
-bool SemanticState::ConsumerImpl::isNotifyEnabld() {
+bool SemanticState::ConsumerImpl::isNotifyEnabled() const {
     Mutex::ScopedLock l(lock);
     return notifyEnabled;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Thu Oct 23 09:21:56 2008
@@ -62,7 +62,7 @@
     class ConsumerImpl : public Consumer, public sys::OutputTask,
                          public boost::enable_shared_from_this<ConsumerImpl>
     {
-        qpid::sys::Mutex lock;
+        mutable qpid::sys::Mutex lock;
         SemanticState* const parent;
         const string name;
         const Queue::shared_ptr queue;
@@ -97,7 +97,7 @@
         void disableNotify();
         void enableNotify();
         void notify();
-        bool isNotifyEnabld();
+        bool isNotifyEnabled() const;
 
         void setWindowMode();
         void setCreditMode();
@@ -106,7 +106,7 @@
         void flush();
         void stop();
         void complete(DeliveryRecord&);    
-        Queue::shared_ptr getQueue() { return queue; }
+        Queue::shared_ptr getQueue() const { return queue; }
         bool isBlocked() const { return blocked; }
         bool setBlocked(bool set) { std::swap(set, blocked); return set; }
 
@@ -147,7 +147,6 @@
     const string userID;
 
     void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
-    void record(const DeliveryRecord& delivery);
     void checkDtxTimeout();
 
     void complete(DeliveryRecord&);
@@ -213,8 +212,13 @@
     void attached();
     void detached();
 
+    // Used by cluster to re-create replica sessions
     static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return boost::polymorphic_downcast<ConsumerImpl*>(p); }
+
     template <class F> void eachConsumer(F f) { outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); }
+    template <class F> void eachUnacked(F f) { std::for_each(unacked.begin(), unacked.end(), f); }
+
+    void record(const DeliveryRecord& delivery);
 };
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h Thu Oct 23 09:21:56 2008
@@ -27,7 +27,7 @@
 
 /**
  * Flow control works by associating a finite amount of "credit"
- * associated with a subscription.
+ * with a subscription.
  *
  * Credit includes a message count and a byte count. Each message
  * received decreases the message count by one, and the byte count by

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Thu Oct 23 09:21:56 2008
@@ -241,7 +241,7 @@
         ProtocolVersion(),
         ci->getName(),
         ci->isBlocked(),
-        ci->isNotifyEnabld()
+        ci->isNotifyEnabled()
     );
     client::SessionBase_0_10Access(shadowSession).get()->send(state);
     QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h Thu Oct 23 09:21:56 2008
@@ -56,7 +56,7 @@
     /** Enable based on a 'level[+]:file' string */
     void enable(const std::string& enableStr);
 
-    /** True if level is enabld for file. */
+    /** True if level is enabled for file. */
     bool isEnabled(Level level, const std::string& function);
 
   private: