You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/10/09 10:36:25 UTC
svn commit: r703102 - in /incubator/qpid/trunk/qpid/cpp/src/qpid:
broker/Queue.cpp broker/Queue.h client/QueueOptions.cpp client/QueueOptions.h
Author: gsim
Date: Thu Oct 9 01:36:25 2008
New Revision: 703102
URL: http://svn.apache.org/viewvc?rev=703102&view=rev
Log:
Minor indentation fixes
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=703102&r1=703101&r2=703102&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Oct 9 01:36:25 2008
@@ -54,15 +54,15 @@
namespace
{
- const std::string qpidMaxSize("qpid.max_size");
- const std::string qpidMaxCount("qpid.max_count");
- const std::string qpidNoLocal("no-local");
- const std::string qpidTraceIdentity("qpid.trace.id");
- const std::string qpidTraceExclude("qpid.trace.exclude");
- const std::string qpidLastValueQueue("qpid.last_value_queue");
- const std::string qpidOptimisticConsume("qpid.optimistic_consume");
- const std::string qpidPersistLastNode("qpid.persist_last_node");
- const std::string qpidVQMatchProperty("qpid.LVQ_key");
+const std::string qpidMaxSize("qpid.max_size");
+const std::string qpidMaxCount("qpid.max_count");
+const std::string qpidNoLocal("no-local");
+const std::string qpidTraceIdentity("qpid.trace.id");
+const std::string qpidTraceExclude("qpid.trace.exclude");
+const std::string qpidLastValueQueue("qpid.last_value_queue");
+const std::string qpidOptimisticConsume("qpid.optimistic_consume");
+const std::string qpidPersistLastNode("qpid.persist_last_node");
+const std::string qpidVQMatchProperty("qpid.LVQ_key");
}
@@ -81,7 +81,7 @@
lastValueQueue(false),
optimisticConsume(false),
persistLastNode(false),
- inLastNodeFailure(false),
+ inLastNodeFailure(false),
persistenceId(0),
policyExceeded(false),
mgmtObject(0)
@@ -156,7 +156,7 @@
push(msg);
msg->enqueueComplete();
}else {
- push(msg);
+ push(msg);
}
mgntEnqStats(msg);
QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
@@ -179,10 +179,10 @@
void Queue::process(boost::intrusive_ptr<Message>& msg){
push(msg);
mgntEnqStats(msg);
- if (mgmtObject != 0){
+ if (mgmtObject != 0){
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
- }
+ }
}
void Queue::requeue(const QueuedMessage& msg){
@@ -432,9 +432,9 @@
{
if (lastValueQueue){
const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
- string key = ft->getString(qpidVQMatchProperty);
- lvq.erase(key);
- }
+ string key = ft->getString(qpidVQMatchProperty);
+ lvq.erase(key);
+ }
messages.pop_front();
}
@@ -446,24 +446,24 @@
if (policy.get()) policy->tryEnqueue(qm);
//if (lastValueQueue && LVQinsert(qm) ) return; // LVQ update of existing message
- LVQ::iterator i;
- if (lastValueQueue){
- const framing::FieldTable* ft = msg->getApplicationHeaders();
- string key = ft->getString(qpidVQMatchProperty);
+ LVQ::iterator i;
+ if (lastValueQueue){
+ const framing::FieldTable* ft = msg->getApplicationHeaders();
+ string key = ft->getString(qpidVQMatchProperty);
- i = lvq.find(key);
- if (i == lvq.end()){
+ i = lvq.find(key);
+ if (i == lvq.end()){
messages.push_back(qm);
listeners.swap(copy);
- lvq[key] = &messages.back();
- }else {
- i->second->payload = msg;
- }
- }else {
+ lvq[key] = &messages.back();
+ }else {
+ i->second->payload = msg;
+ }
+ }else {
messages.push_back(qm);
listeners.swap(copy);
- }
+ }
}
for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
}
@@ -501,31 +501,31 @@
void Queue::setLastNodeFailure()
{
if (persistLastNode){
- Mutex::ScopedLock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
- i->payload->forcePersistent();
- if (i->payload->getPersistenceId() == 0){
+ i->payload->forcePersistent();
+ if (i->payload->getPersistenceId() == 0){
enqueue(0, i->payload);
- }
+ }
}
- inLastNodeFailure = true;
- }
+ inLastNodeFailure = true;
+ }
}
// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{
if (inLastNodeFailure && persistLastNode){
- msg->forcePersistent();
- }
+ msg->forcePersistent();
+ }
- if (traceId.size()) {
+ if (traceId.size()) {
msg->addTraceId(traceId);
}
if (msg->isPersistent() && store) {
- msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
- boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
+ msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
+ boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg);
store->enqueue(ctxt, pmsg, *this);
return true;
}
@@ -542,7 +542,7 @@
}
if (msg.payload->isPersistent() && store) {
msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue
- boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
+ boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload);
store->dequeue(ctxt, pmsg, *this);
return true;
}
@@ -557,7 +557,7 @@
{
QueuedMessage msg = messages.front();
popMsg(msg);
- dequeue(0, msg);
+ dequeue(0, msg);
}
/**
@@ -593,7 +593,7 @@
optimisticConsume= _settings.get(qpidOptimisticConsume);
if (optimisticConsume) QPID_LOG(debug, "Configured queue with optimistic consume");
- persistLastNode= _settings.get(qpidPersistLastNode);
+ persistLastNode= _settings.get(qpidPersistLastNode);
if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node");
traceId = _settings.getString(qpidTraceIdentity);
@@ -783,7 +783,7 @@
switch (methodId)
{
- case _qmf::Queue::METHOD_PURGE :
+ case _qmf::Queue::METHOD_PURGE :
_qmf::ArgsQueuePurge& iargs = (_qmf::ArgsQueuePurge&) args;
purge (iargs.i_request);
status = Manageable::STATUS_OK;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=703102&r1=703101&r2=703102&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Oct 9 01:36:25 2008
@@ -77,12 +77,12 @@
bool lastValueQueue;
bool optimisticConsume;
bool persistLastNode;
- bool inLastNodeFailure;
+ bool inLastNodeFailure;
std::string traceId;
std::vector<std::string> traceExclude;
Listeners listeners;
Messages messages;
- LVQ lvq;
+ LVQ lvq;
mutable qpid::sys::Mutex consumerLock;
mutable qpid::sys::Mutex messageLock;
mutable qpid::sys::Mutex ownershipLock;
@@ -109,26 +109,29 @@
void dequeued(const QueuedMessage& msg);
void popAndDequeue();
- inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg){
+
+ inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
+ {
if (mgmtObject != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
if (msg->isPersistent ()) {
- mgmtObject->inc_msgPersistEnqueues ();
- mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
- }
+ mgmtObject->inc_msgPersistEnqueues ();
+ mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
+ }
+ }
+ }
+ inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg)
+ {
+ if (mgmtObject != 0){
+ mgmtObject->inc_msgTotalDequeues ();
+ mgmtObject->inc_byteTotalDequeues (msg->contentSize());
+ if (msg->isPersistent ()){
+ mgmtObject->inc_msgPersistDequeues ();
+ mgmtObject->inc_bytePersistDequeues (msg->contentSize());
+ }
}
- };
- inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg){
- if (mgmtObject != 0){
- mgmtObject->inc_msgTotalDequeues ();
- mgmtObject->inc_byteTotalDequeues (msg->contentSize());
- if (msg->isPersistent ()){
- mgmtObject->inc_msgPersistDequeues ();
- mgmtObject->inc_bytePersistDequeues (msg->contentSize());
- }
- }
- };
+ }
public:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp?rev=703102&r1=703101&r2=703102&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp Thu Oct 9 01:36:25 2008
@@ -49,45 +49,45 @@
if (maxCount) setInt(strMaxCountKey, maxCount);
if (maxSize) setInt(strMaxSizeKey, maxSize);
if (maxSize || maxCount){
- switch (sp)
- {
- case REJECT:
- setString(strTypeKey, strREJECT);
- break;
- case FLOW_TO_DISK:
- setString(strTypeKey, strFLOW_TO_DISK);
- break;
- case RING:
- setString(strTypeKey, strRING);
- break;
- case RING_STRICT:
- setString(strTypeKey, strRING_STRICT);
- break;
- case NONE:
- clearSizePolicy();
- break;
- }
- }
+ switch (sp)
+ {
+ case REJECT:
+ setString(strTypeKey, strREJECT);
+ break;
+ case FLOW_TO_DISK:
+ setString(strTypeKey, strFLOW_TO_DISK);
+ break;
+ case RING:
+ setString(strTypeKey, strRING);
+ break;
+ case RING_STRICT:
+ setString(strTypeKey, strRING_STRICT);
+ break;
+ case NONE:
+ clearSizePolicy();
+ break;
+ }
+ }
}
void QueueOptions::setOptimisticConsume()
{
- setInt(strOptimisticConsume, 1);
+ setInt(strOptimisticConsume, 1);
}
void QueueOptions::setPersistLastNode()
{
- setInt(strPersistLastNode, 1);
+ setInt(strPersistLastNode, 1);
}
void QueueOptions::setOrdering(QueueOrderingPolicy op)
{
- if (op == LVQ){
- setInt(strLastValueQueue, 1);
- }else{
- clearOrdering();
- }
+ if (op == LVQ){
+ setInt(strLastValueQueue, 1);
+ }else{
+ clearOrdering();
+ }
}
void QueueOptions::getLVQKey(std::string& key)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h?rev=703102&r1=703101&r2=703102&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h Thu Oct 9 01:36:25 2008
@@ -39,76 +39,73 @@
QueueOptions();
virtual ~QueueOptions();
- /**
- * Sets the queue sizing plocy
- *
- * @param sp SizePolicy
- * REJECT - reject if queue greater than size/count
- * FLOW_TO_DISK - page messages to disk from this point is greater than size/count
- * RING - limit the queue to size/count and over-write old messages round a ring
- * RING_STRICT - limit the queue to size/count and reject is head == tail
- * NONE - Use default broker sizing policy
- * @param maxSize Set the max number of bytes for the sizing policies
- * @param setMaxCount Set the max number of messages for the sizing policies
- */
- void setSizePolicy(QueueSizePolicy sp, uint64_t maxSize, uint32_t maxCount );
-
- /**
- * Enables optimistic consume allowing the consumer to dequeue the message before the
- * broker has safe stored it.
- */
- void setOptimisticConsume();
-
- /**
- * Enables the persisting of a queue to the store module when a cluster fails down to it's last
- * node. Does so optimistically. Will start persisting when cluster count >1 again.
- */
- void setPersistLastNode();
-
- /**
- * Sets the odering policy on the Queue, default ordering is FIFO.
- */
- void setOrdering(QueueOrderingPolicy op);
+ /**
+ * Sets the queue sizing plocy
+ *
+ * @param sp SizePolicy
+ * REJECT - reject if queue greater than size/count
+ * FLOW_TO_DISK - page messages to disk from this point is greater than size/count
+ * RING - limit the queue to size/count and over-write old messages round a ring
+ * RING_STRICT - limit the queue to size/count and reject is head == tail
+ * NONE - Use default broker sizing policy
+ * @param maxSize Set the max number of bytes for the sizing policies
+ * @param setMaxCount Set the max number of messages for the sizing policies
+ */
+ void setSizePolicy(QueueSizePolicy sp, uint64_t maxSize, uint32_t maxCount );
+
+ /**
+ * Enables optimistic consume allowing the consumer to dequeue the message before the
+ * broker has safe stored it.
+ */
+ void setOptimisticConsume();
+
+ /**
+ * Enables the persisting of a queue to the store module when a cluster fails down to it's last
+ * node. Does so optimistically. Will start persisting when cluster count >1 again.
+ */
+ void setPersistLastNode();
+
+ /**
+ * Sets the odering policy on the Queue, default ordering is FIFO.
+ */
+ void setOrdering(QueueOrderingPolicy op);
- /**
- * Use broker defualt sizing ploicy
- */
- void clearSizePolicy();
-
- /**
- * Clear Optimistic Consume Policy
- */
- void clearOptimisticConsume();
-
- /**
- * Clear Persist Last Node Policy
- */
- void clearPersistLastNode();
-
- /**
- * get the key used match LVQ in args for message transfer
- */
- void getLVQKey(std::string& key);
+ /**
+ * Use broker defualt sizing ploicy
+ */
+ void clearSizePolicy();
+
+ /**
+ * Clear Optimistic Consume Policy
+ */
+ void clearOptimisticConsume();
+
+ /**
+ * Clear Persist Last Node Policy
+ */
+ void clearPersistLastNode();
+
+ /**
+ * get the key used match LVQ in args for message transfer
+ */
+ void getLVQKey(std::string& key);
- /**
- * Use default odering policy
- */
- void clearOrdering();
-
- static const std::string strMaxCountKey;
- static const std::string strMaxSizeKey;
- static const std::string strTypeKey;
- static const std::string strREJECT;
- static const std::string strFLOW_TO_DISK;
- static const std::string strRING;
- static const std::string strRING_STRICT;
- static const std::string strLastValueQueue;
- static const std::string strOptimisticConsume;
- static const std::string strPersistLastNode;
- static const std::string strLVQMatchProperty;
-
-
-
+ /**
+ * Use default odering policy
+ */
+ void clearOrdering();
+
+ static const std::string strMaxCountKey;
+ static const std::string strMaxSizeKey;
+ static const std::string strTypeKey;
+ static const std::string strREJECT;
+ static const std::string strFLOW_TO_DISK;
+ static const std::string strRING;
+ static const std::string strRING_STRICT;
+ static const std::string strLastValueQueue;
+ static const std::string strOptimisticConsume;
+ static const std::string strPersistLastNode;
+ static const std::string strLVQMatchProperty;
};
}