You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/06/21 21:58:07 UTC
svn commit: r1138153 [2/6] - in /qpid/branches/qpid-3079/qpid: ./ cpp/
cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/python/
cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/dotnet/src/
cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp...
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jun 21 19:58:01 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -65,7 +65,7 @@ using std::mem_fun;
namespace _qmf = qmf::org::apache::qpid::broker;
-namespace
+namespace
{
const std::string qpidMaxSize("qpid.max_size");
const std::string qpidMaxCount("qpid.max_count");
@@ -87,16 +87,16 @@ const int ENQUEUE_ONLY=1;
const int ENQUEUE_AND_DEQUEUE=2;
}
-Queue::Queue(const string& _name, bool _autodelete,
+Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const OwnershipToken* const _owner,
Manageable* parent,
Broker* b) :
- name(_name),
+ name(_name),
autodelete(_autodelete),
store(_store),
- owner(_owner),
+ owner(_owner),
consumerCount(0),
exclusive(0),
noLocal(false),
@@ -179,9 +179,9 @@ void Queue::recover(boost::intrusive_ptr
if (policy.get()) policy->recoverEnqueued(msg);
push(msg, true);
- if (store){
+ if (store){
// setup synclist for recovered messages, so they don't get re-stored on lastNodeFailure
- msg->addToSyncList(shared_from_this(), store);
+ msg->addToSyncList(shared_from_this(), store);
}
if (store && (!msg->isContentLoaded() || msg->checkContentReleasable())) {
@@ -206,13 +206,13 @@ void Queue::process(boost::intrusive_ptr
void Queue::requeue(const QueuedMessage& msg){
assertClusterSafe();
QueueListeners::NotificationSet copy;
- {
+ {
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return;
messages->reinsert(msg);
listeners.populate(copy);
- // for persistLastNode - don't force a message twice to disk, but force it if no force before
+ // for persistLastNode - don't force a message twice to disk, but force it if no force before
if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) {
msg.payload->forcePersistent();
if (msg.payload->isForcedPersistent() ){
@@ -224,7 +224,7 @@ void Queue::requeue(const QueuedMessage&
copy.notify();
}
-bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
+bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
@@ -268,7 +268,7 @@ bool Queue::getNextMessage(QueuedMessage
case NO_MESSAGES:
default:
return false;
- }
+ }
} else {
return browseNextMessage(m, c);
}
@@ -278,7 +278,7 @@ Queue::ConsumeCode Queue::consumeNextMes
{
while (true) {
Mutex::ScopedLock locker(messageLock);
- if (messages->empty()) {
+ if (messages->empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return NO_MESSAGES;
@@ -291,7 +291,7 @@ Queue::ConsumeCode Queue::consumeNextMes
}
if (c->filter(msg.payload)) {
- if (c->accept(msg.payload)) {
+ if (c->accept(msg.payload)) {
m = msg;
pop();
return CONSUMED;
@@ -304,7 +304,7 @@ Queue::ConsumeCode Queue::consumeNextMes
//consumer will never want this message
QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
return CANT_CONSUME;
- }
+ }
}
}
}
@@ -358,7 +358,7 @@ bool Queue::dispatch(Consumer::shared_pt
}
}
-// Find the next message
+// Find the next message
bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
Mutex::ScopedLock locker(messageLock);
if (messages->next(c->position, msg)) {
@@ -426,19 +426,25 @@ bool collect_if_expired(std::deque<Queue
}
}
-void Queue::purgeExpired()
+/**
+ *@param lapse: time since the last purgeExpired
+ */
+void Queue::purgeExpired(qpid::sys::Duration lapse)
{
//As expired messages are discarded during dequeue also, only
//bother explicitly expiring if the rate of dequeues since last
- //attempt is less than one per second.
-
- if (dequeueTracker.sampleRatePerSecond() < 1) {
+ //attempt is less than one per second.
+ int count = dequeueSincePurge.get();
+ dequeueSincePurge -= count;
+ int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
+ if (seconds == 0 || count / seconds < 1) {
std::deque<QueuedMessage> expired;
{
Mutex::ScopedLock locker(messageLock);
- messages->removeIf(boost::bind(&collect_if_expired, expired, _1));
+ messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
}
- for_each(expired.begin(), expired.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
+ for_each(expired.begin(), expired.end(),
+ boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
}
@@ -457,7 +463,7 @@ void Queue::purgeExpired()
uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> dest)
{
Mutex::ScopedLock locker(messageLock);
- uint32_t purge_count = purge_request; // only comes into play if >0
+ uint32_t purge_count = purge_request; // only comes into play if >0
std::deque<DeliverableMessage> rerouteQueue;
uint32_t count = 0;
@@ -490,7 +496,7 @@ uint32_t Queue::purge(const uint32_t pur
uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
Mutex::ScopedLock locker(messageLock);
- uint32_t move_count = qty; // only comes into play if qty >0
+ uint32_t move_count = qty; // only comes into play if qty >0
uint32_t count = 0; // count how many were moved for returning
while((!qty || move_count--) && !messages->empty()) {
@@ -508,7 +514,7 @@ void Queue::pop()
{
assertClusterSafe();
messages->pop();
- ++dequeueTracker;
+ ++dequeueSincePurge;
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
@@ -517,10 +523,10 @@ void Queue::push(boost::intrusive_ptr<Me
QueuedMessage removed;
bool dequeueRequired = false;
{
- Mutex::ScopedLock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
-
+
dequeueRequired = messages->push(qm, removed);
listeners.populate(copy);
enqueued(qm);
@@ -599,7 +605,7 @@ void Queue::setLastNodeFailure()
}
-// return true if store exists,
+// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck)
{
ScopedUse u(barrier);
@@ -619,7 +625,7 @@ bool Queue::enqueue(TransactionContext*
if (inLastNodeFailure && persistLastNode){
msg->forcePersistent();
}
-
+
if (traceId.size()) {
//copy on write: take deep copy of message before modifying it
//as the frames may already be available for delivery on other
@@ -649,9 +655,10 @@ bool Queue::enqueue(TransactionContext*
void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
{
Mutex::ScopedLock locker(messageLock);
- if (policy.get()) policy->enqueueAborted(msg);
+ if (policy.get()) policy->enqueueAborted(msg);
}
+
/**
* Returns a null pointer if the dequeue completed, otherwise the dequeue will complete
* asynchronously, and a pointer to a DequeueCompletion object is returned.
@@ -666,7 +673,7 @@ Queue::dequeue(TransactionContext* ctxt,
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return empty;
- if (!ctxt) {
+ if (!ctxt) {
dequeued(msg);
}
}
@@ -693,7 +700,7 @@ Queue::dequeue(TransactionContext* ctxt,
void Queue::dequeueCommitted(const QueuedMessage& msg)
{
Mutex::ScopedLock locker(messageLock);
- dequeued(msg);
+ dequeued(msg);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
@@ -748,8 +755,8 @@ int getIntegerSetting(const qpid::framin
return v->get<int>();
} else if (v->convertsTo<std::string>()){
std::string s = v->get<std::string>();
- try {
- return boost::lexical_cast<int>(s);
+ try {
+ return boost::lexical_cast<int>(s);
} catch(const boost::bad_lexical_cast&) {
QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
return 0;
@@ -773,7 +780,7 @@ void Queue::configureImpl(const FieldTab
broker->getQueueEvents().observe(*this, eventMode == ENQUEUE_ONLY);
}
- if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
+ if (QueuePolicy::getType(_settings) == QueuePolicy::FLOW_TO_DISK &&
(!store || NullMessageStore::isNullStore(store) || (broker && !(broker->getQueueEvents().isSync())) )) {
if ( NullMessageStore::isNullStore(store)) {
QPID_LOG(warning, "Flow to disk not valid for non-persisted queue:" << getName());
@@ -811,7 +818,7 @@ void Queue::configureImpl(const FieldTab
QPID_LOG(debug, "Configured queue " << getName() << " as priority queue.");
}
}
-
+
persistLastNode= _settings.get(qpidPersistLastNode);
if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node for: " << getName());
@@ -820,15 +827,15 @@ void Queue::configureImpl(const FieldTab
if (excludeList.size()) {
split(traceExclude, excludeList, ", ");
}
- QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId
<< "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements");
FieldTable::ValuePtr p =_settings.get(qpidInsertSequenceNumbers);
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
autoDeleteTimeout = getIntegerSetting(_settings, qpidAutoDeleteTimeout);
- if (autoDeleteTimeout)
- QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
+ if (autoDeleteTimeout)
+ QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout);
if (mgmtObject != 0) {
mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
@@ -892,9 +899,9 @@ const QueuePolicy* Queue::getPolicy()
return policy.get();
}
-uint64_t Queue::getPersistenceId() const
-{
- return persistenceId;
+uint64_t Queue::getPersistenceId() const
+{
+ return persistenceId;
}
void Queue::setPersistenceId(uint64_t _persistenceId) const
@@ -908,11 +915,11 @@ void Queue::setPersistenceId(uint64_t _p
persistenceId = _persistenceId;
}
-void Queue::encode(Buffer& buffer) const
+void Queue::encode(Buffer& buffer) const
{
buffer.putShortString(name);
buffer.put(settings);
- if (policy.get()) {
+ if (policy.get()) {
buffer.put(*policy);
}
buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
@@ -965,7 +972,7 @@ boost::shared_ptr<Exchange> Queue::getAl
void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue)
{
- if (broker.getQueues().destroyIf(queue->getName(),
+ if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
QPID_LOG(debug, "Auto-deleting " << queue->getName());
queue->destroyed();
@@ -977,7 +984,7 @@ struct AutoDeleteTask : qpid::sys::Timer
Broker& broker;
Queue::shared_ptr queue;
- AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
+ AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime)
: qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {}
void fire()
@@ -995,27 +1002,27 @@ void Queue::tryAutoDelete(Broker& broker
if (queue->autoDeleteTimeout && queue->canAutoDelete()) {
AbsTime time(now(), Duration(queue->autoDeleteTimeout * TIME_SEC));
queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, time));
- broker.getClusterTimer().add(queue->autoDeleteTask);
+ broker.getClusterTimer().add(queue->autoDeleteTask);
QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
} else {
tryAutoDeleteImpl(broker, queue);
}
}
-bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
-{
+bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
+{
Mutex::ScopedLock locker(ownershipLock);
- return o == owner;
+ return o == owner;
}
-void Queue::releaseExclusiveOwnership()
-{
+void Queue::releaseExclusiveOwnership()
+{
Mutex::ScopedLock locker(ownershipLock);
- owner = 0;
+ owner = 0;
}
-bool Queue::setExclusiveOwner(const OwnershipToken* const o)
-{
+bool Queue::setExclusiveOwner(const OwnershipToken* const o)
+{
//reset auto deletion timer if necessary
if (autoDeleteTimeout && autoDeleteTask) {
autoDeleteTask->cancel();
@@ -1024,20 +1031,20 @@ bool Queue::setExclusiveOwner(const Owne
if (owner) {
return false;
} else {
- owner = o;
+ owner = o;
return true;
}
}
-bool Queue::hasExclusiveOwner() const
-{
+bool Queue::hasExclusiveOwner() const
+{
Mutex::ScopedLock locker(ownershipLock);
- return owner != 0;
+ return owner != 0;
}
-bool Queue::hasExclusiveConsumer() const
-{
- return exclusive;
+bool Queue::hasExclusiveConsumer() const
+{
+ return exclusive;
}
void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
@@ -1206,6 +1213,10 @@ const Broker* Queue::getBroker()
return broker;
}
+void Queue::setDequeueSincePurge(uint32_t value) {
+ dequeueSincePurge = value;
+}
+
/** invoked from the store thread when the asynchronous dequeueing of the
* message has completed. */
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/Queue.h Tue Jun 21 19:58:01 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,9 +32,9 @@
#include "qpid/broker/QueueBindings.h"
#include "qpid/broker/QueueListeners.h"
#include "qpid/broker/QueueObserver.h"
-#include "qpid/broker/RateTracker.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
@@ -75,13 +75,13 @@ class Queue : public boost::enable_share
{
Queue& parent;
uint count;
-
+
UsageBarrier(Queue&);
bool acquire();
void release();
void destroy();
};
-
+
struct ScopedUse
{
UsageBarrier& barrier;
@@ -89,7 +89,7 @@ class Queue : public boost::enable_share
ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {}
~ScopedUse() { if (acquired) barrier.release(); }
};
-
+
typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
@@ -120,7 +120,7 @@ class Queue : public boost::enable_share
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
qmf::org::apache::qpid::broker::Queue* mgmtObject;
- RateTracker dequeueTracker;
+ sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
int eventMode;
Observers observers;
bool insertSeqNo;
@@ -147,7 +147,7 @@ class Queue : public boost::enable_share
void dequeued(const QueuedMessage& msg);
void pop();
void popAndDequeue();
- QueuedMessage getFront();
+
void forcePersistent(QueuedMessage& msg);
int getEventMode();
void configureImpl(const qpid::framing::FieldTable& settings);
@@ -185,8 +185,8 @@ class Queue : public boost::enable_share
typedef std::vector<shared_ptr> vector;
QPID_BROKER_EXTERN Queue(const std::string& name,
- bool autodelete = false,
- MessageStore* const store = 0,
+ bool autodelete = false,
+ MessageStore* const store = 0,
const OwnershipToken* const owner = 0,
management::Manageable* parent = 0,
Broker* broker = 0);
@@ -246,11 +246,11 @@ class Queue : public boost::enable_share
bool exclusive = false);
QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
- uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
- QPID_BROKER_EXTERN void purgeExpired();
+ uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
+ QPID_BROKER_EXTERN void purgeExpired(sys::Duration);
//move qty # of messages to destination Queue destq
- uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
+ uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
QPID_BROKER_EXTERN uint32_t getMessageCount() const;
QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
@@ -313,8 +313,8 @@ class Queue : public boost::enable_share
* Inform queue of messages that were enqueued, have since
* been acquired but not yet accepted or released (and
* thus are still logically on the queue) - used in
- * clustered broker.
- */
+ * clustered broker.
+ */
void updateEnqueued(const QueuedMessage& msg);
/**
@@ -325,9 +325,9 @@ class Queue : public boost::enable_share
* accepted it).
*/
bool isEnqueued(const QueuedMessage& msg);
-
+
/**
- * Gets the next available message
+ * Gets the next available message
*/
QPID_BROKER_EXTERN QueuedMessage get();
@@ -408,6 +408,9 @@ class Queue : public boost::enable_share
const Broker* getBroker();
+ uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
+ void setDequeueSincePurge(uint32_t value);
+
private:
std::map<PersistableMessage *, boost::intrusive_ptr<DequeueCompletion> > pendingDequeueCompletions;
};
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueCleaner.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueCleaner.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueCleaner.cpp Tue Jun 21 19:58:01 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,7 +27,7 @@
namespace qpid {
namespace broker {
-QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {}
+QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), timer(t) {}
QueueCleaner::~QueueCleaner()
{
@@ -36,10 +36,16 @@ QueueCleaner::~QueueCleaner()
void QueueCleaner::start(qpid::sys::Duration p)
{
+ period = p;
task = new Task(*this, p);
- timer.add(task);
+ timer->add(task);
}
+void QueueCleaner::setTimer(qpid::sys::Timer* timer) {
+ this->timer = timer;
+}
+
+
QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d,"QueueCleaner"), parent(p) {}
void QueueCleaner::Task::fire()
@@ -65,9 +71,9 @@ void QueueCleaner::fired()
std::vector<Queue::shared_ptr> copy;
CollectQueues collect(©);
queues.eachQueue(collect);
- std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1));
+ std::for_each(copy.begin(), copy.end(), boost::bind(&Queue::purgeExpired, _1, period));
task->setupNextFire();
- timer.add(task);
+ timer->add(task);
}
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueCleaner.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueCleaner.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueCleaner.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueCleaner.h Tue Jun 21 19:58:01 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,14 +35,15 @@ class QueueRegistry;
class QueueCleaner
{
public:
- QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer);
+ QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer* timer);
QPID_BROKER_EXTERN ~QueueCleaner();
- QPID_BROKER_EXTERN void start(qpid::sys::Duration period);
+ QPID_BROKER_EXTERN void start(sys::Duration period);
+ QPID_BROKER_EXTERN void setTimer(sys::Timer* timer);
private:
class Task : public sys::TimerTask
{
public:
- Task(QueueCleaner& parent, qpid::sys::Duration duration);
+ Task(QueueCleaner& parent, sys::Duration duration);
void fire();
private:
QueueCleaner& parent;
@@ -50,7 +51,8 @@ class QueueCleaner
boost::intrusive_ptr<sys::TimerTask> task;
QueueRegistry& queues;
- sys::Timer& timer;
+ sys::Timer* timer;
+ sys::Duration period;
void fired();
};
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Tue Jun 21 19:58:01 2011
@@ -167,7 +167,8 @@ void QueueFlowLimit::enqueued(const Queu
msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes
bool unique;
unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second;
- assert(unique);
+ // Like this to avoid tripping up unused variable warning when NDEBUG set
+ if (!unique) assert(unique);
}
}
@@ -379,7 +380,8 @@ void QueueFlowLimit::setState(const qpid
QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked
bool unique;
unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
- assert(unique);
+ // Like this to avoid tripping up unused variable warning when NDEBUG set
+ if (!unique) assert(unique);
}
}
}
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Tue Jun 21 19:58:01 2011
@@ -117,19 +117,20 @@ void QueuePolicy::update(FieldTable& set
settings.setString(typeKey, type);
}
-uint32_t QueuePolicy::getCapacity(const FieldTable& settings, const std::string& key, uint32_t defaultValue)
+template <typename T>
+T getCapacity(const FieldTable& settings, const std::string& key, T defaultValue)
{
FieldTable::ValuePtr v = settings.get(key);
- int32_t result = 0;
+ T result = 0;
if (!v) return defaultValue;
if (v->getType() == 0x23) {
QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>());
} else if (v->getType() == 0x33) {
QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>());
- } else if (v->convertsTo<int>()) {
- result = v->get<int>();
+ } else if (v->convertsTo<T>()) {
+ result = v->get<T>();
QPID_LOG(debug, "Got integer value for " << key << ": " << result);
if (result >= 0) return result;
} else if (v->convertsTo<string>()) {
@@ -319,8 +320,8 @@ std::auto_ptr<QueuePolicy> QueuePolicy::
std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings)
{
- uint32_t maxCount = getCapacity(settings, maxCountKey, 0);
- uint32_t maxSize = getCapacity(settings, maxSizeKey, defaultMaxSize);
+ uint32_t maxCount = getCapacity<int32_t>(settings, maxCountKey, 0);
+ uint64_t maxSize = getCapacity<int64_t>(settings, maxSizeKey, defaultMaxSize);
if (maxCount || maxSize) {
return createQueuePolicy(name, maxCount, maxSize, getType(settings));
} else {
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueuePolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueuePolicy.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueuePolicy.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/QueuePolicy.h Tue Jun 21 19:58:01 2011
@@ -43,8 +43,7 @@ class QueuePolicy
uint32_t count;
uint64_t size;
bool policyExceeded;
-
- static uint32_t getCapacity(const qpid::framing::FieldTable& settings, const std::string& key, uint32_t defaultValue);
+
protected:
uint64_t getCurrentQueueSize() const { return size; }
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Jun 21 19:58:01 2011
@@ -70,7 +70,7 @@ SemanticState::SemanticState(DeliveryAda
deliveryAdapter(da),
tagGenerator("sgen"),
dtxSelected(false),
- authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()),
+ authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
userID(getSession().getConnection().getUserId()),
userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))),
isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())),
@@ -469,7 +469,6 @@ void SemanticState::route(intrusive_ptr<
/* verify the userid if specified: */
std::string id =
msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring;
-
if (authMsg && !id.empty() && !(id == userID || (isDefaultRealm && id == userName)))
{
QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id);
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp Tue Jun 21 19:58:01 2011
@@ -81,12 +81,11 @@ class SslProtocolFactory : public qpid::
SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay);
~SslProtocolFactory();
void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*);
- void connect(sys::Poller::shared_ptr, const std::string& host, int16_t port,
+ void connect(sys::Poller::shared_ptr, const std::string& host, const std::string& port,
sys::ConnectionCodec::Factory*,
ConnectFailedCallback failed);
uint16_t getPort() const;
- std::string getHost() const;
bool supports(const std::string& capability);
private:
@@ -130,7 +129,7 @@ SslProtocolFactory::SslProtocolFactory(c
int backlog,
bool nodelay)
: tcpNoDelay(nodelay),
- listeningPort(listener.listen(options.port, backlog)),
+ listeningPort(listener.listen("", boost::lexical_cast<std::string>(options.port), backlog)),
clientAuthSelected(options.clientAuth) {
SecInvalidateHandle(&credHandle);
@@ -237,10 +236,6 @@ uint16_t SslProtocolFactory::getPort() c
return listeningPort; // Immutable no need for lock.
}
-std::string SslProtocolFactory::getHost() const {
- return listener.getSockname();
-}
-
void SslProtocolFactory::accept(sys::Poller::shared_ptr poller,
sys::ConnectionCodec::Factory* fact) {
acceptor.reset(
@@ -251,7 +246,7 @@ void SslProtocolFactory::accept(sys::Pol
void SslProtocolFactory::connect(sys::Poller::shared_ptr poller,
const std::string& host,
- int16_t port,
+ const std::string& port,
sys::ConnectionCodec::Factory* fact,
ConnectFailedCallback failed)
{
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Tue Jun 21 19:58:01 2011
@@ -36,6 +36,7 @@
#include <boost/bind.hpp>
#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
#include <boost/shared_ptr.hpp>
#include <limits>
@@ -258,16 +259,16 @@ void ConnectionImpl::open()
connector->setInputHandler(&handler);
connector->setShutdownHandler(this);
try {
- connector->connect(host, port);
-
+ std::string p = boost::lexical_cast<std::string>(port);
+ connector->connect(host, p);
+
} catch (const std::exception& e) {
QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what());
connector.reset();
throw;
}
connector->init();
- QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
-
+
// Enable heartbeat if requested
uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat;
if (heartbeat) {
@@ -281,6 +282,7 @@ void ConnectionImpl::open()
// - in that case in connector.reset() above;
// - or when we are deleted
handler.waitForOpen();
+ QPID_LOG(info, *this << " connected to " << protocol << ":" << host << ":" << port);
// If the SASL layer has provided an "operational" userId for the connection,
// put it in the negotiated settings.
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/Connector.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/Connector.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/Connector.h Tue Jun 21 19:58:01 2011
@@ -61,7 +61,7 @@ class Connector : public framing::Output
static void registerFactory(const std::string& proto, Factory* connectorFactory);
virtual ~Connector() {};
- virtual void connect(const std::string& host, int port) = 0;
+ virtual void connect(const std::string& host, const std::string& port) = 0;
virtual void init() {};
virtual void close() = 0;
virtual void send(framing::AMQFrame& frame) = 0;
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/RdmaConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/RdmaConnector.cpp Tue Jun 21 19:58:01 2011
@@ -95,7 +95,7 @@ class RdmaConnector : public Connector,
std::string identifier;
- void connect(const std::string& host, int port);
+ void connect(const std::string& host, const std::string& port);
void close();
void send(framing::AMQFrame& frame);
void abort() {} // TODO: need to fix this for heartbeat timeouts to work
@@ -173,7 +173,7 @@ RdmaConnector::~RdmaConnector() {
}
}
-void RdmaConnector::connect(const std::string& host, int port){
+void RdmaConnector::connect(const std::string& host, const std::string& port){
Mutex::ScopedLock l(dataConnectedLock);
assert(!dataConnected);
@@ -184,7 +184,7 @@ void RdmaConnector::connect(const std::s
boost::bind(&RdmaConnector::disconnected, this),
boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
- SocketAddress sa(host, boost::lexical_cast<std::string>(port));
+ SocketAddress sa(host, port);
acon->start(poller, sa);
}
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/SslConnector.cpp Tue Jun 21 19:58:01 2011
@@ -114,7 +114,7 @@ class SslConnector : public Connector
std::string identifier;
- void connect(const std::string& host, int port);
+ void connect(const std::string& host, const std::string& port);
void init();
void close();
void send(framing::AMQFrame& frame);
@@ -190,7 +190,7 @@ SslConnector::~SslConnector() {
close();
}
-void SslConnector::connect(const std::string& host, int port){
+void SslConnector::connect(const std::string& host, const std::string& port){
Mutex::ScopedLock l(closedLock);
assert(closed);
try {
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/TCPConnector.cpp Tue Jun 21 19:58:01 2011
@@ -88,7 +88,7 @@ TCPConnector::~TCPConnector() {
close();
}
-void TCPConnector::connect(const std::string& host, int port) {
+void TCPConnector::connect(const std::string& host, const std::string& port) {
Mutex::ScopedLock l(lock);
assert(closed);
connector = AsynchConnector::create(
@@ -121,7 +121,7 @@ void TCPConnector::start(sys::AsynchIO*
aio->queueReadBuffer(new Buff(maxFrameSize));
}
- identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
+ identifier = str(format("[%1%]") % socket.getFullAddress());
}
void TCPConnector::initAmqp() {
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/TCPConnector.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/TCPConnector.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/TCPConnector.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/TCPConnector.h Tue Jun 21 19:58:01 2011
@@ -98,7 +98,7 @@ class TCPConnector : public Connector, p
protected:
virtual ~TCPConnector();
- void connect(const std::string& host, int port);
+ void connect(const std::string& host, const std::string& port);
void start(sys::AsynchIO* aio_);
void initAmqp();
virtual void connectFailed(const std::string& msg);
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.cpp Tue Jun 21 19:58:01 2011
@@ -30,12 +30,23 @@ void AcceptTracker::State::accept()
unaccepted.clear();
}
-void AcceptTracker::State::accept(qpid::framing::SequenceNumber id)
+SequenceSet AcceptTracker::State::accept(qpid::framing::SequenceNumber id, bool cumulative)
{
- if (unaccepted.contains(id)) {
- unaccepted.remove(id);
- unconfirmed.add(id);
+ SequenceSet accepting;
+ if (cumulative) {
+ for (SequenceSet::iterator i = unaccepted.begin(); i != unaccepted.end() && *i <= id; ++i) {
+ accepting.add(*i);
+ }
+ unconfirmed.add(accepting);
+ unaccepted.remove(accepting);
+ } else {
+ if (unaccepted.contains(id)) {
+ unaccepted.remove(id);
+ unconfirmed.add(id);
+ accepting.add(id);
+ }
}
+ return accepting;
}
void AcceptTracker::State::release()
@@ -71,16 +82,15 @@ void AcceptTracker::accept(qpid::client:
aggregateState.accept();
}
-void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session)
+void AcceptTracker::accept(qpid::framing::SequenceNumber id, qpid::client::AsyncSession& session, bool cumulative)
{
for (StateMap::iterator i = destinationState.begin(); i != destinationState.end(); ++i) {
- i->second.accept(id);
+ i->second.accept(id, cumulative);
}
Record record;
- record.accepted.add(id);
+ record.accepted = aggregateState.accept(id, cumulative);
record.status = session.messageAccept(record.accepted);
pending.push_back(record);
- aggregateState.accept(id);
}
void AcceptTracker::release(qpid::client::AsyncSession& session)
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AcceptTracker.h Tue Jun 21 19:58:01 2011
@@ -42,7 +42,7 @@ class AcceptTracker
public:
void delivered(const std::string& destination, const qpid::framing::SequenceNumber& id);
void accept(qpid::client::AsyncSession&);
- void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&);
+ void accept(qpid::framing::SequenceNumber, qpid::client::AsyncSession&, bool cumulative);
void release(qpid::client::AsyncSession&);
uint32_t acceptsPending();
uint32_t acceptsPending(const std::string& destination);
@@ -62,7 +62,7 @@ class AcceptTracker
qpid::framing::SequenceSet unconfirmed;
void accept();
- void accept(qpid::framing::SequenceNumber);
+ qpid::framing::SequenceSet accept(qpid::framing::SequenceNumber, bool cumulative);
void release();
uint32_t acceptsPending();
void completed(qpid::framing::SequenceSet&);
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Tue Jun 21 19:58:01 2011
@@ -233,6 +233,8 @@ class Subscription : public Exchange, pu
const bool reliable;
const bool durable;
const std::string actualType;
+ const bool exclusiveQueue;
+ const bool exclusiveSubscription;
FieldTable queueOptions;
FieldTable subscriptionOptions;
Bindings bindings;
@@ -307,6 +309,7 @@ struct Opt
Opt& operator/(const std::string& name);
operator bool() const;
std::string str() const;
+ bool asBool(bool defaultValue) const;
const Variant::List& asList() const;
void collect(qpid::framing::FieldTable& args) const;
@@ -338,6 +341,12 @@ Opt::operator bool() const
return value && !value->isVoid() && value->asBool();
}
+bool Opt::asBool(bool defaultValue) const
+{
+ if (value) return value->asBool();
+ else return defaultValue;
+}
+
std::string Opt::str() const
{
if (value) return value->asString();
@@ -490,7 +499,9 @@ Subscription::Subscription(const Address
queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
reliable(AddressResolution::is_reliable(address)),
durable(Opt(address)/LINK/DURABLE),
- actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type)
+ actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type),
+ exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
+ exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue))
{
(Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
(Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
@@ -550,7 +561,7 @@ void Subscription::subscribe(qpid::clien
checkAssert(session, FOR_RECEIVER);
//create subscription queue:
- session.queueDeclare(arg::queue=queue, arg::exclusive=true,
+ session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
//'default' binding:
bindings.bind(session);
@@ -559,15 +570,15 @@ void Subscription::subscribe(qpid::clien
linkBindings.bind(session);
//subscribe to subscription queue:
AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
- session.messageSubscribe(arg::queue=queue, arg::destination=destination,
- arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
+ session.messageSubscribe(arg::queue=queue, arg::destination=destination,
+ arg::exclusive=exclusiveSubscription, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
}
void Subscription::cancel(qpid::client::AsyncSession& session, const std::string& destination)
{
linkBindings.unbind(session);
session.messageCancel(destination);
- session.queueDelete(arg::queue=queue);
+ if (reliable) session.queueDelete(arg::queue=queue, arg::ifUnused=true);
checkDelete(session, FOR_RECEIVER);
}
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Tue Jun 21 19:58:01 2011
@@ -40,11 +40,15 @@ using qpid::types::VAR_LIST;
using qpid::framing::Uuid;
namespace {
-void convert(const Variant::List& from, std::vector<std::string>& to)
+void merge(const std::string& value, std::vector<std::string>& list) {
+ if (std::find(list.begin(), list.end(), value) == list.end())
+ list.push_back(value);
+}
+
+void merge(const Variant::List& from, std::vector<std::string>& to)
{
- for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i) {
- to.push_back(i->asString());
- }
+ for (Variant::List::const_iterator i = from.begin(); i != from.end(); ++i)
+ merge(i->asString(), to);
}
std::string asString(const std::vector<std::string>& v) {
@@ -93,9 +97,9 @@ void ConnectionImpl::setOption(const std
maxReconnectInterval = value;
} else if (name == "reconnect-urls" || name == "reconnect_urls") {
if (value.getType() == VAR_LIST) {
- convert(value.asList(), urls);
+ merge(value.asList(), urls);
} else {
- urls.push_back(value.asString());
+ merge(value.asString(), urls);
}
} else if (name == "username") {
settings.username = value.asString();
@@ -198,7 +202,7 @@ qpid::messaging::Session ConnectionImpl:
sessions[name] = impl;
break;
} catch (const qpid::TransportFailure&) {
- open();
+ reopen();
} catch (const qpid::SessionException& e) {
throw qpid::messaging::SessionError(e.what());
} catch (const std::exception& e) {
@@ -219,6 +223,15 @@ void ConnectionImpl::open()
catch (const qpid::Exception& e) { throw messaging::ConnectionError(e.what()); }
}
+void ConnectionImpl::reopen()
+{
+ if (!reconnect) {
+ throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
+ }
+ open();
+}
+
+
bool expired(const qpid::sys::AbsTime& start, int64_t timeout)
{
if (timeout == 0) return true;
@@ -246,14 +259,9 @@ void ConnectionImpl::connect(const qpid:
}
void ConnectionImpl::mergeUrls(const std::vector<Url>& more, const sys::Mutex::ScopedLock&) {
- if (more.size()) {
- for (size_t i = 0; i < more.size(); ++i) {
- if (std::find(urls.begin(), urls.end(), more[i].str()) == urls.end()) {
- urls.push_back(more[i].str());
- }
- }
- QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
- }
+ for (std::vector<Url>::const_iterator i = more.begin(); i != more.end(); ++i)
+ merge(i->str(), urls);
+ QPID_LOG(debug, "Added known-hosts, reconnect-urls=" << asString(urls));
}
bool ConnectionImpl::tryConnect()
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h Tue Jun 21 19:58:01 2011
@@ -43,6 +43,7 @@ class ConnectionImpl : public qpid::mess
public:
ConnectionImpl(const std::string& url, const qpid::types::Variant::Map& options);
void open();
+ void reopen();
bool isOpen() const;
void close();
qpid::messaging::Session newSession(bool transactional, const std::string& name);
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp Tue Jun 21 19:58:01 2011
@@ -144,10 +144,10 @@ void IncomingMessages::accept()
acceptTracker.accept(session);
}
-void IncomingMessages::accept(qpid::framing::SequenceNumber id)
+void IncomingMessages::accept(qpid::framing::SequenceNumber id, bool cumulative)
{
sys::Mutex::ScopedLock l(lock);
- acceptTracker.accept(id, session);
+ acceptTracker.accept(id, session, cumulative);
}
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h Tue Jun 21 19:58:01 2011
@@ -72,7 +72,7 @@ class IncomingMessages
bool get(Handler& handler, qpid::sys::Duration timeout);
bool getNextDestination(std::string& destination, qpid::sys::Duration timeout);
void accept();
- void accept(qpid::framing::SequenceNumber id);
+ void accept(qpid::framing::SequenceNumber id, bool cumulative);
void releaseAll();
void releasePending(const std::string& destination);
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Tue Jun 21 19:58:01 2011
@@ -112,13 +112,14 @@ void SessionImpl::release(qpid::messagin
execute1<Release>(m);
}
-void SessionImpl::acknowledge(qpid::messaging::Message& m)
+void SessionImpl::acknowledge(qpid::messaging::Message& m, bool cumulative)
{
//Should probably throw an exception on failure here, or indicate
//it through a return type at least. Failure means that the
//message may be redelivered; i.e. the application cannot delete
//any state necessary for preventing reprocessing of the message
- execute1<Acknowledge1>(m);
+ Acknowledge2 ack(*this, m, cumulative);
+ execute(ack);
}
void SessionImpl::close()
@@ -467,10 +468,10 @@ void SessionImpl::acknowledgeImpl()
if (!transactional) incoming.accept();
}
-void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m)
+void SessionImpl::acknowledgeImpl(qpid::messaging::Message& m, bool cumulative)
{
ScopedLock l(lock);
- if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId());
+ if (!transactional) incoming.accept(MessageImplAccess::get(m).getInternalId(), cumulative);
}
void SessionImpl::rejectImpl(qpid::messaging::Message& m)
@@ -509,7 +510,7 @@ void SessionImpl::senderCancelled(const
void SessionImpl::reconnect()
{
- connection->open();
+ connection->reopen();
}
bool SessionImpl::backoff()
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Tue Jun 21 19:58:01 2011
@@ -63,7 +63,7 @@ class SessionImpl : public qpid::messagi
void acknowledge(bool sync);
void reject(qpid::messaging::Message&);
void release(qpid::messaging::Message&);
- void acknowledge(qpid::messaging::Message& msg);
+ void acknowledge(qpid::messaging::Message& msg, bool cumulative);
void close();
void sync(bool block);
qpid::messaging::Sender createSender(const qpid::messaging::Address& address);
@@ -139,7 +139,7 @@ class SessionImpl : public qpid::messagi
void commitImpl();
void rollbackImpl();
void acknowledgeImpl();
- void acknowledgeImpl(qpid::messaging::Message&);
+ void acknowledgeImpl(qpid::messaging::Message&, bool cumulative);
void rejectImpl(qpid::messaging::Message&);
void releaseImpl(qpid::messaging::Message&);
void closeImpl();
@@ -204,12 +204,13 @@ class SessionImpl : public qpid::messagi
void operator()() { impl.releaseImpl(message); }
};
- struct Acknowledge1 : Command
+ struct Acknowledge2 : Command
{
qpid::messaging::Message& message;
+ bool cumulative;
- Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {}
- void operator()() { impl.acknowledgeImpl(message); }
+ Acknowledge2(SessionImpl& i, qpid::messaging::Message& m, bool c) : Command(i), message(m), cumulative(c) {}
+ void operator()() { impl.acknowledgeImpl(message, cumulative); }
};
struct CreateSender;
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/windows/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/windows/SslConnector.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/windows/SslConnector.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/client/windows/SslConnector.cpp Tue Jun 21 19:58:01 2011
@@ -77,7 +77,7 @@ public:
framing::ProtocolVersion pVersion,
const ConnectionSettings&,
ConnectionImpl*);
- virtual void connect(const std::string& host, int port);
+ virtual void connect(const std::string& host, const std::string& port);
virtual void connected(const Socket&);
unsigned int getSSF();
};
@@ -153,7 +153,7 @@ SslConnector::~SslConnector()
// Will this get reach via virtual method via boost::bind????
-void SslConnector::connect(const std::string& host, int port) {
+void SslConnector::connect(const std::string& host, const std::string& port) {
brokerHost = host;
TCPConnector::connect(host, port);
}
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jun 21 19:58:01 2011
@@ -146,6 +146,7 @@
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
+#include "qpid/framing/ClusterClockBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAbortBody.h"
#include "qpid/framing/ClusterRetractOfferBody.h"
@@ -198,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpi
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1097431;
+const uint32_t Cluster::CLUSTER_VERSION = 1128070;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -230,7 +231,6 @@ struct ClusterDispatcher : public framin
cluster.updateOffer(member, updatee, l);
}
void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
- void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) {
cluster.errorCheck(member, type, frameSeq, l);
}
@@ -240,6 +240,7 @@ struct ClusterDispatcher : public framin
void deliverToQueue(const std::string& queue, const std::string& message) {
cluster.deliverToQueue(queue, message, l);
}
+ void clock(uint64_t time) { cluster.clock(time, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
@@ -253,7 +254,7 @@ Cluster::Cluster(const ClusterSettings&
self(cpg.self()),
clusterId(true),
mAgent(0),
- expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
+ expiryPolicy(new ExpiryPolicy(*this)),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
@@ -365,7 +366,8 @@ void Cluster::addShadowConnection(const
assert(discarding);
pair<ConnectionMap::iterator, bool> ib
= connections.insert(ConnectionMap::value_type(c->getId(), c));
- assert(ib.second);
+ // Like this to avoid tripping up unused variable warning when NDEBUG set
+ if (!ib.second) assert(ib.second);
}
void Cluster::erase(const ConnectionId& id) {
@@ -667,6 +669,8 @@ void Cluster::initMapCompleted(Lock& l)
else { // I can go ready.
discarding = false;
setReady(l);
+ // Must be called *before* memberUpdate so first update will be generated.
+ failoverExchange->setReady();
memberUpdate(l);
updateMgmtMembership(l);
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
@@ -719,6 +723,20 @@ void Cluster::configChange(const MemberI
updateMgmtMembership(l); // Update on every config change for consistency
}
+struct ClusterClockTask : public sys::TimerTask {
+ Cluster& cluster;
+ sys::Timer& timer;
+
+ ClusterClockTask(Cluster& cluster, sys::Timer& timer, uint16_t clockInterval)
+ : TimerTask(Duration(clockInterval * TIME_MSEC),"ClusterClock"), cluster(cluster), timer(timer) {}
+
+ void fire() {
+ cluster.sendClockUpdate();
+ setupNextFire();
+ timer.add(this);
+ }
+};
+
void Cluster::becomeElder(Lock&) {
if (elder) return; // We were already the elder.
// We are the oldest, reactive links if necessary
@@ -726,6 +744,8 @@ void Cluster::becomeElder(Lock&) {
elder = true;
broker.getLinks().setPassive(false);
timer->becomeElder();
+
+ clockTimer.add(new ClusterClockTask(*this, clockTimer, settings.clockInterval));
}
void Cluster::makeOffer(const MemberId& id, Lock& ) {
@@ -846,7 +866,7 @@ void Cluster::updateOffer(const MemberId
if (updatee != self && url) {
QPID_LOG(debug, debugSnapshot());
if (mAgent) mAgent->clusterUpdate();
- // Updatee will call clusterUpdate when update completes
+ // Updatee will call clusterUpdate() via checkUpdateIn() when update completes
}
}
@@ -927,10 +947,11 @@ void Cluster::checkUpdateIn(Lock& l) {
if (!updateClosed) return; // Wait till update connection closes.
if (updatedMap) { // We're up to date
map = *updatedMap;
- failoverExchange->setUrls(getUrls(l));
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
memberUpdate(l);
+ // Must be called *after* memberUpdate() to avoid sending an extra update.
+ failoverExchange->setReady();
// NB: don't updateMgmtMembership() here as we are not in the deliver
// thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
@@ -1120,10 +1141,6 @@ void Cluster::setClusterId(const Uuid& u
QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);
}
-void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
- expiryPolicy->deliverExpire(id);
-}
-
void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) {
// If we see an errorCheck here (rather than in the ErrorCheck
// class) then we have processed succesfully past the point of the
@@ -1161,6 +1178,35 @@ void Cluster::deliverToQueue(const std::
q->deliver(msg);
}
+sys::AbsTime Cluster::getClusterTime() {
+ Mutex::ScopedLock l(lock);
+ return clusterTime;
+}
+
+// This method is called during update on the updatee to set the initial cluster time.
+void Cluster::clock(const uint64_t time) {
+ Mutex::ScopedLock l(lock);
+ clock(time, l);
+}
+
+// called when broadcast message received
+void Cluster::clock(const uint64_t time, Lock&) {
+ clusterTime = AbsTime(EPOCH, time);
+ AbsTime now = AbsTime::now();
+
+ if (!elder) {
+ clusterTimeOffset = Duration(now, clusterTime);
+ }
+}
+
+// called by elder timer to send clock broadcast
+void Cluster::sendClockUpdate() {
+ Mutex::ScopedLock l(lock);
+ int64_t nanosecondsSinceEpoch = Duration(EPOCH, now());
+ nanosecondsSinceEpoch += clusterTimeOffset;
+ mcast.mcastControl(ClusterClockBody(ProtocolVersion(), nanosecondsSinceEpoch), self);
+}
+
bool Cluster::deferDeliveryImpl(const std::string& queue,
const boost::intrusive_ptr<broker::Message>& msg)
{
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Cluster.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Cluster.h Tue Jun 21 19:58:01 2011
@@ -63,6 +63,12 @@ class AMQBody;
struct Uuid;
}
+namespace sys {
+class Timer;
+class AbsTime;
+class Duration;
+}
+
namespace cluster {
class Connection;
@@ -135,6 +141,9 @@ class Cluster : private Cpg::Handler, pu
bool deferDeliveryImpl(const std::string& queue,
const boost::intrusive_ptr<broker::Message>& msg);
+ sys::AbsTime getClusterTime();
+ void sendClockUpdate();
+ void clock(const uint64_t time);
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -180,12 +189,12 @@ class Cluster : private Cpg::Handler, pu
const std::string& left,
const std::string& joined,
Lock& l);
- void messageExpired(const MemberId&, uint64_t, Lock& l);
void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
void timerWakeup(const MemberId&, const std::string& name, Lock&);
void timerDrop(const MemberId&, const std::string& name, Lock&);
void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
void deliverToQueue(const std::string& queue, const std::string& message, Lock&);
+ void clock(const uint64_t time, Lock&);
// Helper functions
ConnectionPtr getConnection(const EventFrame&, Lock&);
@@ -296,6 +305,9 @@ class Cluster : private Cpg::Handler, pu
ErrorCheck error;
UpdateReceiver updateReceiver;
ClusterTimer* timer;
+ sys::Timer clockTimer;
+ sys::AbsTime clusterTime;
+ sys::Duration clusterTimeOffset;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend struct ClusterDispatcher;
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Tue Jun 21 19:58:01 2011
@@ -72,6 +72,7 @@ struct ClusterOptions : public Options {
("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
("cluster-size", optValue(settings.size, "N"), "Wait for N cluster members before allowing clients to connect.")
+ ("cluster-clock-interval", optValue(settings.clockInterval,"N"), "How often to broadcast the current time to the cluster nodes, in milliseconds. A value between 5 and 1000 is recommended.")
("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.")
;
}
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterSettings.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterSettings.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterSettings.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterSettings.h Tue Jun 21 19:58:01 2011
@@ -35,8 +35,9 @@ struct ClusterSettings {
size_t readMax;
std::string username, password, mechanism;
size_t size;
+ uint16_t clockInterval;
- ClusterSettings() : quorum(false), readMax(10), size(1)
+ ClusterSettings() : quorum(false), readMax(10), size(1), clockInterval(10)
{}
Url getUrl(uint16_t port) const {
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp Tue Jun 21 19:58:01 2011
@@ -70,6 +70,7 @@ void ClusterTimer::add(intrusive_ptr<Tim
if (i != map.end())
throw Exception(QPID_MSG("Task already exists with name " << task->getName()));
map[task->getName()] = task;
+
// Only the elder actually activates the task with the Timer base class.
if (cluster.isElder()) {
QPID_LOG(trace, "Elder activating cluster timer task " << task->getName());
@@ -112,6 +113,9 @@ void ClusterTimer::deliverWakeup(const s
else {
intrusive_ptr<TimerTask> t = i->second;
map.erase(i);
+ // Move the nextFireTime so readyToFire() is true. This is to ensure we
+ // don't get an error if the fired task calls setupNextFire()
+ t->setFired();
Timer::fire(t);
}
}
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Jun 21 19:58:01 2011
@@ -322,10 +322,10 @@ size_t Connection::decode(const char* da
while (localDecoder.decode(buf))
received(localDecoder.getFrame());
if (!wasOpen && connection->isOpen()) {
- // Connections marked as federation links are allowed to proxy
+ // Connections marked with setUserProxyAuth are allowed to proxy
// messages with user-ID that doesn't match the connection's
// authenticated ID. This is important for updates.
- connection->setFederationLink(isCatchUp());
+ connection->setUserProxyAuth(isCatchUp());
}
}
else { // Multicast local connections.
@@ -601,10 +601,6 @@ void Connection::queueObserverState(cons
QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies.");
}
-void Connection::expiryId(uint64_t id) {
- cluster.getExpiryPolicy().setId(id);
-}
-
std::ostream& operator<<(std::ostream& o, const Connection& c) {
const char* type="unknown";
if (c.isLocal()) type = "local";
@@ -724,5 +720,16 @@ void Connection::doCatchupIoCallbacks()
if (catchUp) getBrokerConnection()->doIoCallbacks();
}
+
+void Connection::clock(uint64_t time) {
+ QPID_LOG(debug, "Cluster connection received time update");
+ cluster.clock(time);
+}
+
+void Connection::queueDequeueSincePurgeState(const std::string& qname, uint32_t dequeueSincePurge) {
+ boost::shared_ptr<broker::Queue> queue(findQueue(qname));
+ queue->setDequeueSincePurge(dequeueSincePurge);
+}
+
}} // Namespace qpid::cluster
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Connection.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/Connection.h Tue Jun 21 19:58:01 2011
@@ -155,7 +155,6 @@ class Connection :
void queuePosition(const std::string&, const framing::SequenceNumber&);
void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count);
void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&);
- void expiryId(uint64_t);
void txStart();
void txAccept(const framing::SequenceSet&);
@@ -192,6 +191,10 @@ class Connection :
void doCatchupIoCallbacks();
+ void clock(uint64_t time);
+
+ void queueDequeueSincePurgeState(const std::string&, uint32_t);
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp Tue Jun 21 19:58:01 2011
@@ -21,106 +21,21 @@
#include "qpid/broker/Message.h"
#include "qpid/cluster/ExpiryPolicy.h"
-#include "qpid/cluster/Multicaster.h"
-#include "qpid/framing/ClusterMessageExpiredBody.h"
+#include "qpid/cluster/Cluster.h"
#include "qpid/sys/Time.h"
-#include "qpid/sys/Timer.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace cluster {
-ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t)
- : expiryId(1), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
+ExpiryPolicy::ExpiryPolicy(Cluster& cluster) : cluster(cluster) {}
-struct ExpiryTask : public sys::TimerTask {
- ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
- : TimerTask(when,"ExpiryPolicy"), expiryPolicy(policy), expiryId(id) {}
- void fire() { expiryPolicy->sendExpire(expiryId); }
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
- const uint64_t expiryId;
-};
-
-// Called while receiving an update
-void ExpiryPolicy::setId(uint64_t id) {
- sys::Mutex::ScopedLock l(lock);
- expiryId = id;
-}
-
-// Called while giving an update
-uint64_t ExpiryPolicy::getId() const {
- sys::Mutex::ScopedLock l(lock);
- return expiryId;
-}
-
-// Called in enqueuing connection thread
-void ExpiryPolicy::willExpire(broker::Message& m) {
- uint64_t id;
- {
- // When messages are fanned out to multiple queues, update sends
- // them as independenty messages so we can have multiple messages
- // with the same expiry ID.
- //
- sys::Mutex::ScopedLock l(lock);
- id = expiryId++;
- if (!id) { // This is an update of an already-expired message.
- m.setExpiryPolicy(expiredPolicy);
- }
- else {
- assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
- // If this is an update, the id may already exist
- unexpiredById.insert(IdMessageMap::value_type(id, &m));
- unexpiredByMessage[&m] = id;
- }
- }
- timer.add(new ExpiryTask(this, id, m.getExpiration()));
-}
-
-// Called in dequeueing connection thread
-void ExpiryPolicy::forget(broker::Message& m) {
- sys::Mutex::ScopedLock l(lock);
- MessageIdMap::iterator i = unexpiredByMessage.find(&m);
- assert(i != unexpiredByMessage.end());
- unexpiredById.erase(i->second);
- unexpiredByMessage.erase(i);
-}
-
-// Called in dequeueing connection or cleanup thread.
bool ExpiryPolicy::hasExpired(broker::Message& m) {
- sys::Mutex::ScopedLock l(lock);
- return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
-}
-
-// Called in timer thread
-void ExpiryPolicy::sendExpire(uint64_t id) {
- {
- sys::Mutex::ScopedLock l(lock);
- // Don't multicast an expiry notice if message is already forgotten.
- if (unexpiredById.find(id) == unexpiredById.end()) return;
- }
- mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
+ return m.getExpiration() < cluster.getClusterTime();
}
-// Called in CPG deliver thread.
-void ExpiryPolicy::deliverExpire(uint64_t id) {
- sys::Mutex::ScopedLock l(lock);
- std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id);
- IdMessageMap::iterator i = expired.first;
- while (i != expired.second) {
- i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true;
- unexpiredByMessage.erase(i->second);
- unexpiredById.erase(i++);
- }
+sys::AbsTime ExpiryPolicy::getCurrentTime() {
+ return cluster.getClusterTime();
}
-// Called in update thread on the updater.
-boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
- sys::Mutex::ScopedLock l(lock);
- MessageIdMap::iterator i = unexpiredByMessage.find(&m);
- return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
-}
-
-bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }
-void ExpiryPolicy::Expired::willExpire(broker::Message&) { }
-
}} // namespace qpid::cluster
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h Tue Jun 21 19:58:01 2011
@@ -36,12 +36,8 @@ namespace broker {
class Message;
}
-namespace sys {
-class Timer;
-}
-
namespace cluster {
-class Multicaster;
+class Cluster;
/**
* Cluster expiry policy
@@ -49,43 +45,13 @@ class Multicaster;
class ExpiryPolicy : public broker::ExpiryPolicy
{
public:
- ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&);
+ ExpiryPolicy(Cluster& cluster);
- void willExpire(broker::Message&);
bool hasExpired(broker::Message&);
- void forget(broker::Message&);
-
- // Send expiration notice to cluster.
- void sendExpire(uint64_t);
-
- // Cluster delivers expiry notice.
- void deliverExpire(uint64_t);
+ qpid::sys::AbsTime getCurrentTime();
- void setId(uint64_t id);
- uint64_t getId() const;
-
- boost::optional<uint64_t> getId(broker::Message&);
-
private:
- typedef std::map<broker::Message*, uint64_t> MessageIdMap;
- // When messages are fanned out to multiple queues, update sends
- // them as independenty messages so we can have multiple messages
- // with the same expiry ID.
- typedef std::multimap<uint64_t, broker::Message*> IdMessageMap;
-
- struct Expired : public broker::ExpiryPolicy {
- bool hasExpired(broker::Message&);
- void willExpire(broker::Message&);
- };
-
- mutable sys::Mutex lock;
- MessageIdMap unexpiredByMessage;
- IdMessageMap unexpiredById;
- uint64_t expiryId;
- boost::intrusive_ptr<Expired> expiredPolicy;
- Multicaster& mcast;
- MemberId memberId;
- sys::Timer& timer;
+ Cluster& cluster;
};
}} // namespace qpid::cluster
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/FailoverExchange.cpp Tue Jun 21 19:58:01 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,8 +39,10 @@ using namespace broker;
using namespace framing;
const string FailoverExchange::typeName("amq.failover");
-
-FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) : Exchange(typeName, parent, b ) {
+
+FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b)
+ : Exchange(typeName, parent, b ), ready(false)
+{
if (mgmtExchange != 0)
mgmtExchange->set_type(typeName);
}
@@ -53,16 +55,17 @@ void FailoverExchange::setUrls(const vec
void FailoverExchange::updateUrls(const vector<Url>& u) {
Lock l(lock);
urls=u;
- if (urls.empty()) return;
- std::for_each(queues.begin(), queues.end(),
- boost::bind(&FailoverExchange::sendUpdate, this, _1));
+ if (ready && !urls.empty()) {
+ std::for_each(queues.begin(), queues.end(),
+ boost::bind(&FailoverExchange::sendUpdate, this, _1));
+ }
}
string FailoverExchange::getType() const { return typeName; }
bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) {
Lock l(lock);
- sendUpdate(queue);
+ if (ready) sendUpdate(queue);
return queues.insert(queue).second;
}
@@ -84,7 +87,7 @@ void FailoverExchange::sendUpdate(const
// Called with lock held.
if (urls.empty()) return;
framing::Array array(0x95);
- for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
+ for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
const ProtocolVersion v;
boost::intrusive_ptr<Message> msg(new Message);
@@ -96,9 +99,12 @@ void FailoverExchange::sendUpdate(const
header.get<MessageProperties>(true)->getApplicationHeaders().setArray(typeName, array);
AMQFrame headerFrame(header);
headerFrame.setFirstSegment(false);
- msg->getFrames().append(headerFrame);
+ msg->getFrames().append(headerFrame);
DeliverableMessage(msg).deliverTo(queue);
}
+void FailoverExchange::setReady() {
+ ready = true;
+}
}} // namespace cluster
Modified: qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/FailoverExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/FailoverExchange.h?rev=1138153&r1=1138152&r2=1138153&view=diff
==============================================================================
--- qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/FailoverExchange.h (original)
+++ qpid/branches/qpid-3079/qpid/cpp/src/qpid/cluster/FailoverExchange.h Tue Jun 21 19:58:01 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -46,6 +46,8 @@ class FailoverExchange : public broker::
void setUrls(const std::vector<Url>&);
/** Set the URLs and send an update.*/
void updateUrls(const std::vector<Url>&);
+ /** Flag the failover exchange as ready to generate updates (caught up) */
+ void setReady();
// Exchange overrides
std::string getType() const;
@@ -56,7 +58,7 @@ class FailoverExchange : public broker::
private:
void sendUpdate(const boost::shared_ptr<broker::Queue>&);
-
+
typedef sys::Mutex::ScopedLock Lock;
typedef std::vector<Url> Urls;
typedef std::set<boost::shared_ptr<broker::Queue> > Queues;
@@ -64,7 +66,7 @@ class FailoverExchange : public broker::
sys::Mutex lock;
Urls urls;
Queues queues;
-
+ bool ready;
};
}} // namespace qpid::cluster
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org