You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/23 18:21:56 UTC
svn commit: r707406 - in /incubator/qpid/trunk/qpid/cpp/src/qpid:
broker/DeliveryRecord.cpp broker/DeliveryRecord.h broker/Queue.cpp
broker/Queue.h broker/SemanticState.cpp broker/SemanticState.h
client/FlowControl.h cluster/DumpClient.cpp log/Selector.h
Author: aconway
Date: Thu Oct 23 09:21:56 2008
New Revision: 707406
URL: http://svn.apache.org/viewvc?rev=707406&view=rev
Log:
Minor changes to provide access for cluster to replicate delivery records.
- broker::Queue: find message by position, set position.
- broker::SemanticState: make record() public, add eachUnacked(), fix typo "NotifyEnabld"
- broker::DeliveryRecord: added more public accessors
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Thu Oct 23 09:21:56 2008
@@ -31,17 +31,16 @@
using std::string;
DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
- Queue::shared_ptr _queue,
- const std::string _tag,
- bool _acquired, bool accepted,
+ const Queue::shared_ptr& _queue,
+ const std::string& _tag,
+ bool _acquired,
+ bool accepted,
bool _windowing) : msg(_msg),
queue(_queue),
tag(_tag),
acquired(_acquired),
acceptExpected(!accepted),
cancelled(false),
- credit(msg.payload ? msg.payload->getRequiredCredit() : 0),
- size(msg.payload ? msg.payload->contentSize() : 0),
completed(false),
ended(accepted),
windowing(_windowing)
@@ -154,7 +153,7 @@
uint32_t DeliveryRecord::getCredit() const
{
- return credit;
+ return msg.payload ? msg.payload->getRequiredCredit() : 0;
}
void DeliveryRecord::acquire(DeliveryIds& results) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Thu Oct 23 09:21:56 2008
@@ -46,17 +46,21 @@
bool acquired;
bool acceptExpected;
bool cancelled;
- const uint32_t credit;
- const uint64_t size;
bool completed;
bool ended;
const bool windowing;
public:
- DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue,
- const std::string tag,
- bool acquired, bool confirmed, bool windowing);
+ DeliveryRecord(
+ const QueuedMessage& msg,
+ const Queue::shared_ptr& queue,
+ const std::string& tag,
+ bool acquired,
+ bool accepted,
+ bool windowing
+ );
+
bool matches(DeliveryId tag) const;
bool matchOrAfter(DeliveryId tag) const;
bool after(DeliveryId tag) const;
@@ -76,13 +80,21 @@
bool isAcquired() const { return acquired; }
bool isComplete() const { return completed; }
bool isRedundant() const { return ended && (!windowing || completed); }
-
+ bool isCancelled() const { return cancelled; }
+ bool isAccepted() const { return !acceptExpected; }
+ bool isEnded() const { return ended; }
+ bool isWindowing() const { return windowing; }
+
uint32_t getCredit() const;
- const std::string& getTag() const { return tag; }
+ const std::string& getTag() const { return tag; }
void deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize);
void setId(DeliveryId _id) { id = _id; }
+ const QueuedMessage& getMessage() const { return msg; }
+ framing::SequenceNumber getId() const { return id; }
+ Queue::shared_ptr getQueue() const { return queue; }
+
friend bool operator<(const DeliveryRecord&, const DeliveryRecord&);
friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Oct 23 09:21:56 2008
@@ -365,6 +365,25 @@
return false;
}
+namespace {
+struct PositionEquals {
+ SequenceNumber pos;
+ PositionEquals(SequenceNumber p) : pos(p) {}
+ bool operator()(const QueuedMessage& msg) const { return msg.position == pos; }
+};
+}// namespace
+
+bool Queue::find(QueuedMessage& msg, SequenceNumber pos) const {
+ Mutex::ScopedLock locker(messageLock);
+ Messages::const_iterator i = std::find_if(messages.begin(), messages.end(), PositionEquals(pos));
+ if (i == messages.end())
+ return false;
+ else {
+ msg = *i;
+ return true;
+ }
+}
+
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
Mutex::ScopedLock locker(consumerLock);
if(exclusive) {
@@ -827,3 +846,11 @@
return status;
}
+
+void Queue::setPosition(SequenceNumber n) {
+ if (n <= sequence)
+ throw InvalidArgumentException(QPID_MSG("Invalid position " << n << " < " << sequence
+ << " for queue " << name));
+ sequence = n;
+ --sequence; // Decrement so ++sequence will return n.
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Oct 23 09:21:56 2008
@@ -66,7 +66,7 @@
typedef std::list<Consumer::shared_ptr> Listeners;
typedef std::deque<QueuedMessage> Messages;
- typedef std::map<string,QueuedMessage*> LVQ;
+ typedef std::map<string,QueuedMessage*> LVQ;
const string name;
const bool autodelete;
@@ -95,7 +95,7 @@
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
qmf::org::apache::qpid::broker::Queue* mgmtObject;
- RateTracker dequeueTracker;
+RateTracker dequeueTracker;
void push(boost::intrusive_ptr<Message>& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -227,6 +227,13 @@
*/
QueuedMessage get();
+ /** Get the message at position pos
+ *@param msg out parameter, assigned to the message found.
+ *@param pos position to search for.
+ *@return True if there is a message at pos, false otherwise.
+ */
+ bool find(QueuedMessage& msg, framing::SequenceNumber pos) const;
+
const QueuePolicy* getPolicy();
void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
@@ -264,6 +271,11 @@
void popMsg(QueuedMessage& qmsg);
+ /** Set the position sequence number for the next message on the queue.
+ * Must be >= the current sequence number.
+ * Used by cluster to replicate queues.
+ */
+ void setPosition(framing::SequenceNumber pos);
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Oct 23 09:21:56 2008
@@ -610,7 +610,7 @@
notifyEnabled = false;
}
-bool SemanticState::ConsumerImpl::isNotifyEnabld() {
+bool SemanticState::ConsumerImpl::isNotifyEnabled() const {
Mutex::ScopedLock l(lock);
return notifyEnabled;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Thu Oct 23 09:21:56 2008
@@ -62,7 +62,7 @@
class ConsumerImpl : public Consumer, public sys::OutputTask,
public boost::enable_shared_from_this<ConsumerImpl>
{
- qpid::sys::Mutex lock;
+ mutable qpid::sys::Mutex lock;
SemanticState* const parent;
const string name;
const Queue::shared_ptr queue;
@@ -97,7 +97,7 @@
void disableNotify();
void enableNotify();
void notify();
- bool isNotifyEnabld();
+ bool isNotifyEnabled() const;
void setWindowMode();
void setCreditMode();
@@ -106,7 +106,7 @@
void flush();
void stop();
void complete(DeliveryRecord&);
- Queue::shared_ptr getQueue() { return queue; }
+ Queue::shared_ptr getQueue() const { return queue; }
bool isBlocked() const { return blocked; }
bool setBlocked(bool set) { std::swap(set, blocked); return set; }
@@ -147,7 +147,6 @@
const string userID;
void route(boost::intrusive_ptr<Message> msg, Deliverable& strategy);
- void record(const DeliveryRecord& delivery);
void checkDtxTimeout();
void complete(DeliveryRecord&);
@@ -213,8 +212,13 @@
void attached();
void detached();
+ // Used by cluster to re-create replica sessions
static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return boost::polymorphic_downcast<ConsumerImpl*>(p); }
+
template <class F> void eachConsumer(F f) { outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); }
+ template <class F> void eachUnacked(F f) { std::for_each(unacked.begin(), unacked.end(), f); }
+
+ void record(const DeliveryRecord& delivery);
};
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FlowControl.h Thu Oct 23 09:21:56 2008
@@ -27,7 +27,7 @@
/**
* Flow control works by associating a finite amount of "credit"
- * associated with a subscription.
+ * with a subscription.
*
* Credit includes a message count and a byte count. Each message
* received decreases the message count by one, and the byte count by
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Thu Oct 23 09:21:56 2008
@@ -241,7 +241,7 @@
ProtocolVersion(),
ci->getName(),
ci->isBlocked(),
- ci->isNotifyEnabld()
+ ci->isNotifyEnabled()
);
client::SessionBase_0_10Access(shadowSession).get()->send(state);
QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId());
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h?rev=707406&r1=707405&r2=707406&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h Thu Oct 23 09:21:56 2008
@@ -56,7 +56,7 @@
/** Enable based on a 'level[+]:file' string */
void enable(const std::string& enableStr);
- /** True if level is enabld for file. */
+ /** True if level is enabled for file. */
bool isEnabled(Level level, const std::string& function);
private: