You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC
svn commit: r1368910 [8/27] - in /qpid/branches/asyncstore: ./ bin/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/
cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/
cpp/bindings/qpid/ruby/features/step_definitions/ cpp/...
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Aug 3 12:13:32 2012
@@ -19,13 +19,18 @@
*
*/
+#include "QueueGuard.h"
+#include "QueueRange.h"
+#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
+#include "Primary.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
+#include "qpid/types/Uuid.h"
#include <sstream>
namespace qpid {
@@ -34,19 +39,90 @@ namespace ha {
using namespace framing;
using namespace broker;
using namespace std;
+using sys::Mutex;
-const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
+const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription");
+const string ReplicatingSubscription::QPID_BACK("qpid.ha-back");
+const string ReplicatingSubscription::QPID_FRONT("qpid.ha-front");
+const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info");
namespace {
const string DOLLAR("$");
const string INTERNAL("-internal");
} // namespace
+// Scan the queue for gaps and add them to the subscriptions dequed set.
+class DequeueScanner
+{
+ public:
+ DequeueScanner(
+ ReplicatingSubscription& rs,
+ SequenceNumber front_,
+ SequenceNumber back_ // Inclusive
+ ) : subscription(rs), front(front_), back(back_)
+ {
+ assert(front <= back);
+ // INVARIANT deques have been added for positions <= at.
+ at = front - 1;
+ }
+
+ void operator()(const QueuedMessage& qm) {
+ if (qm.position >= front && qm.position <= back) {
+ if (qm.position > at+1) subscription.dequeued(at+1, qm.position-1);
+ at = qm.position;
+ }
+ }
+
+ // Must call after scanning the queue.
+ void finish() {
+ if (at < back) subscription.dequeued(at+1, back);
+ }
+
+ private:
+ ReplicatingSubscription& subscription;
+ SequenceNumber front;
+ SequenceNumber back;
+ SequenceNumber at;
+};
+
string mask(const string& in)
{
return DOLLAR + in + INTERNAL;
}
+
+/** Dummy consumer used to get the front position on the queue */
+class GetPositionConsumer : public Consumer
+{
+ public:
+ GetPositionConsumer() :
+ Consumer("ha.GetPositionConsumer."+types::Uuid(true).str(), false) {}
+ bool deliver(broker::QueuedMessage& ) { return true; }
+ void notify() {}
+ bool filter(boost::intrusive_ptr<broker::Message>) { return true; }
+ bool accept(boost::intrusive_ptr<broker::Message>) { return true; }
+ void cancel() {}
+ void acknowledged(const broker::QueuedMessage&) {}
+ bool browseAcquired() const { return true; }
+ broker::OwnershipToken* getSession() { return 0; }
+};
+
+
+bool ReplicatingSubscription::getNext(
+ broker::Queue& q, SequenceNumber from, SequenceNumber& result)
+{
+ boost::shared_ptr<Consumer> c(new GetPositionConsumer);
+ c->setPosition(from);
+ if (!q.dispatch(c)) return false;
+ result = c->getPosition();
+ return true;
+}
+
+bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) {
+ // FIXME aconway 2012-05-23: won't wrap, assumes 0 is < all messages in queue.
+ return getNext(q, 0, front);
+}
+
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
@@ -66,7 +142,7 @@ ReplicatingSubscription::Factory::create
rs.reset(new ReplicatingSubscription(
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
- queue->addObserver(rs);
+ rs->initialize();
}
return rs;
}
@@ -84,179 +160,223 @@ ReplicatingSubscription::ReplicatingSubs
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments),
- events(new Queue(mask(name))),
- consumer(new DelegatingConsumer(*this))
+ dummy(new Queue(mask(name))),
+ ready(false)
{
- // Separate the remote part from a "local-remote" address.
- string address = parent->getSession().getConnection().getUrl();
- size_t i = address.find('-');
- if (i != string::npos) address = address.substr(i+1);
- logPrefix = "HA: Primary ";
- stringstream ss;
- logSuffix = " (" + address + ")";
-
- // FIXME aconway 2011-12-09: Failover optimization removed.
- // There was code here to re-use messages already on the backup
- // during fail-over. This optimization was removed to simplify
- // the logic till we get the basic replication stable, it
- // can be re-introduced later. Last revision with the optimization:
- // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
-
- QPID_LOG(debug, logPrefix << "created backup subscription " << getName() << logSuffix);
-
- // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
- // so we will start consuming from the lowest numbered message.
- // This is incorrect if the sequence number wraps around, but
- // this is what all consumers currently do.
-}
-
-// Message is delivered in the subscription's connection thread.
-bool ReplicatingSubscription::deliver(QueuedMessage& m) {
try {
- // Add position events for the subscribed queue, not for the internal event queue.
- if (m.queue && m.queue == getQueue().get()) {
- sys::Mutex::ScopedLock l(lock);
- if (position != m.position)
- throw Exception(
- QPID_MSG("Expected position " << position
- << " but got " << m.position));
- // m.position is the position of the newly enqueued m on the local queue.
- // backupPosition is latest position on the backup queue (before enqueueing m.)
- if (m.position <= backupPosition)
- throw Exception(
- QPID_MSG("Expected position > " << backupPosition
- << " but got " << m.position));
-
- if (m.position - backupPosition > 1) {
- // Position has advanced because of messages dequeued ahead of us.
- SequenceNumber send(m.position);
- --send; // Send the position before m was enqueued.
- sendPositionEvent(send, l);
+ FieldTable ft;
+ if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
+ throw Exception("Replicating subscription does not have broker info: " + tag);
+ info.assign(ft);
+
+ // Set a log prefix message that identifies the remote broker.
+ ostringstream os;
+ os << "Primary " << queue->getName() << "@" << info.getLogId() << ": ";
+ logPrefix = os.str();
+
+ // NOTE: Once the guard is attached we can have concurrent
+ // calls to dequeued so we need to lock use of this->dequeues.
+ //
+ // However we must attach the guard _before_ we scan for
+ // initial dequeues to be sure we don't miss any dequeues
+ // between the scan and attaching the guard.
+ if (Primary::get()) guard = Primary::get()->getGuard(queue, info);
+ if (!guard) guard.reset(new QueueGuard(*queue, info));
+ guard->attach(*this);
+
+ QueueRange backup(arguments); // Remote backup range.
+ QueueRange primary(guard->getRange()); // Unguarded range when the guard was set.
+ backupPosition = backup.back;
+
+ // Sync backup and primary queues, don't send messages already on the backup
+
+ if (backup.front > primary.front || // Missing messages at front
+ backup.back < primary.front || // No overlap
+ primary.empty() || backup.empty()) // Empty
+ {
+ // No useful overlap - erase backup and start from the beginning
+ if (!backup.empty()) dequeued(backup.front, backup.back);
+ position = primary.front-1;
+ }
+ else { // backup and primary do overlap.
+ // Remove messages from backup that are not in primary.
+ if (primary.back < backup.back) {
+ dequeued(primary.back+1, backup.back); // Trim excess messages at back
+ backup.back = primary.back;
+ }
+ if (backup.front < primary.front) {
+ dequeued(backup.front, primary.front-1); // Trim excess messages at front
+ backup.front = primary.front;
}
- backupPosition = m.position;
- QPID_LOG(trace, logPrefix << "replicating " << m << logSuffix);
+ DequeueScanner scan(*this, backup.front, backup.back);
+ // FIXME aconway 2012-06-15: Optimize queue traversal, only in range.
+ queue->eachMessage(boost::ref(scan)); // Remove missing messages in between.
+ scan.finish();
+ position = backup.back;
}
- return ConsumerImpl::deliver(m);
- } catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "error replicating " << getQueue()->getName()
- << logSuffix << ": " << e.what());
+ // NOTE: we are assuming that the messages that are on the backup are
+ // consistent with those on the primary. If the backup is a replica
+ // queue and hasn't been tampered with then that will be the case.
+
+ QPID_LOG(debug, logPrefix << "Subscribed:"
+ << " backup:" << backup
+ << " primary:" << primary
+ << " catch-up: " << position << "-" << primary.back
+ << "(" << primary.back-position << ")");
+
+ // Check if we are ready yet.
+ if (guard->subscriptionStart(position)) setReady();
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, logPrefix << "Creation error: " << e.what()
+ << ": arguments=" << getArguments());
throw;
}
}
-ReplicatingSubscription::~ReplicatingSubscription() {}
-
+ReplicatingSubscription::~ReplicatingSubscription() {
+ QPID_LOG(debug, logPrefix << "Detroyed replicating subscription");
+}
-// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg
+// Called in subscription's connection thread when the subscription is created.
+// Called separate from ctor because sending events requires
+// shared_from_this
+//
+void ReplicatingSubscription::initialize() {
+ try {
+ Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
-// Mark a message completed. May be called by acknowledge or dequeued
-void ReplicatingSubscription::complete(
- const QueuedMessage& qm, const sys::Mutex::ScopedLock&)
-{
- // Handle completions for the subscribed queue, not the internal event queue.
- if (qm.queue && qm.queue == getQueue().get()) {
- QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix);
- Delayed::iterator i= delayed.find(qm.position);
- // The same message can be completed twice, by acknowledged and
- // dequeued, remove it from the set so it only gets completed
- // once.
- if (i != delayed.end()) {
- assert(i->second.payload == qm.payload);
- qm.payload->getIngressCompletion().finishCompleter();
- delayed.erase(i);
- }
+ // Send initial dequeues and position to the backup.
+ // There must be a shared_ptr(this) when sending.
+ sendDequeueEvent(l);
+ sendPositionEvent(position, l);
+ backupPosition = position;
+ }
+ catch (const std::exception& e) {
+ QPID_LOG(error, logPrefix << "Initialization error: " << e.what()
+ << ": arguments=" << getArguments());
+ throw;
}
}
-// Called before we get notified of the message being available and
-// under the message lock in the queue. Called in arbitrary connection thread.
-void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
- sys::Mutex::ScopedLock l(lock);
- // Delay completion
- QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix);
- qm.payload->getIngressCompletion().startCompleter();
- assert(delayed.find(qm.position) == delayed.end());
- delayed[qm.position] = qm;
+// Message is delivered in the subscription's connection thread.
+bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
+ try {
+ // Add position events for the subscribed queue, not the internal event queue.
+ if (qm.queue == getQueue().get()) {
+ QPID_LOG(trace, logPrefix << "Replicating " << qm);
+ {
+ Mutex::ScopedLock l(lock);
+ assert(position == qm.position);
+ // qm.position is the position of the newly enqueued qm on local queue.
+ // backupPosition is latest position on backup queue before enqueueing
+ if (qm.position <= backupPosition)
+ throw Exception(
+ QPID_MSG("Expected position > " << backupPosition
+ << " but got " << qm.position));
+ if (qm.position - backupPosition > 1) {
+ // Position has advanced because of messages dequeued ahead of us.
+ // Send the position before qm was enqueued.
+ sendPositionEvent(qm.position-1, l);
+ }
+ // Backup will automatically advance by 1 on delivery of message.
+ backupPosition = qm.position;
+ }
+ }
+ return ConsumerImpl::deliver(qm);
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, logPrefix << "Error replicating " << qm
+ << ": " << e.what());
+ throw;
+ }
}
-
-// Function to complete a delayed message, called by cancel()
-void ReplicatingSubscription::cancelComplete(
- const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
-{
- QPID_LOG(trace, logPrefix << "cancel completed " << v.second << logSuffix);
- v.second.payload->getIngressCompletion().finishCompleter();
+void ReplicatingSubscription::setReady() {
+ {
+ Mutex::ScopedLock l(lock);
+ if (ready) return;
+ ready = true;
+ }
+ // Notify Primary that a subscription is ready.
+ QPID_LOG(debug, logPrefix << "Caught up");
+ if (Primary::get()) Primary::get()->readyReplica(*this);
}
// Called in the subscription's connection thread.
void ReplicatingSubscription::cancel()
{
- getQueue()->removeObserver(
- boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
- {
- sys::Mutex::ScopedLock l(lock);
- QPID_LOG(debug, logPrefix << "cancel backup subscription " << getName() << logSuffix);
- for_each(delayed.begin(), delayed.end(),
- boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
- delayed.clear();
- }
+ guard->cancel();
ConsumerImpl::cancel();
}
-// Called on primary in the backups IO thread.
-void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
- sys::Mutex::ScopedLock l(lock);
- // Finish completion of message, it has been acknowledged by the backup.
- complete(msg, l);
+// Consumer override, called on primary in the backup's IO thread.
+void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) {
+ if (qm.queue == getQueue().get()) { // Don't complete messages on the internal queue
+ // Finish completion of message, it has been acknowledged by the backup.
+ QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
+ guard->complete(qm);
+ // If next message is protected by the guard then we are ready
+ if (qm.position >= guard->getRange().back) setReady();
+ }
+ ConsumerImpl::acknowledged(qm);
}
-// Hide the "queue deleted" error for a ReplicatingSubscription when a
-// queue is deleted, this is normal and not an error.
-bool ReplicatingSubscription::hideDeletedError() { return true; }
-
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
+void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
{
- QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues
- << " from " << getQueue()->getName() << logSuffix);
+ if (dequeues.empty()) return;
+ QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
string buf(dequeues.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
dequeues.encode(buffer);
dequeues.clear();
buffer.reset();
- sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
+ {
+ Mutex::ScopedUnlock u(lock);
+ sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer);
+ }
}
-// Called after the message has been removed from the deque and under
-// the messageLock in the queue. Called in arbitrary connection threads.
+// Called via QueueObserver::dequeued override on guard.
+// Called after the message has been removed
+// from the deque and under the messageLock in the queue. Called in
+// arbitrary connection threads.
void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
{
+ assert (qm.queue == getQueue().get());
+ QPID_LOG(trace, logPrefix << "Dequeued " << qm);
{
- sys::Mutex::ScopedLock l(lock);
- QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix);
+ Mutex::ScopedLock l(lock);
dequeues.add(qm.position);
- // If we have not yet sent this message to the backup, then
- // complete it now as it will never be accepted.
- if (qm.position > position) complete(qm, l);
}
notify(); // Ensure a call to doDispatch
}
+// Called during construction while scanning for initial dequeues.
+void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last) {
+ QPID_LOG(trace, logPrefix << "Initial dequeue [" << first << ", " << last << "]");
+ {
+ Mutex::ScopedLock l(lock);
+ dequeues.add(first,last);
+ }
+}
+
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendPositionEvent(
- SequenceNumber position, const sys::Mutex::ScopedLock&l )
+void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&)
{
- QPID_LOG(trace, logPrefix << "sending position " << position
- << ", was " << backupPosition << logSuffix);
- string buf(backupPosition.encodedSize(),'\0');
+ if (pos == backupPosition) return; // No need to send.
+ QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition);
+ string buf(pos.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
- position.encode(buffer);
+ pos.encode(buffer);
buffer.reset();
- sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l);
+ {
+ Mutex::ScopedUnlock u(lock);
+ sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer);
+ }
}
-void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer,
- const sys::Mutex::ScopedLock&)
+void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer)
{
//generate event message
boost::intrusive_ptr<Message> event = new Message();
@@ -276,15 +396,14 @@ void ReplicatingSubscription::sendEvent(
event->getFrames().append(header);
event->getFrames().append(content);
- DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ DeliveryProperties* props =
+ event->getFrames().getHeaders()->get<DeliveryProperties>(true);
props->setRoutingKey(key);
- // Send the event using the events queue. Consumer is a
- // DelegatingConsumer that delegates to *this for everything but
- // has an independnet position. We put an event on events and
- // dispatch it through ourselves to send it in line with the
- // normal browsing messages.
- events->deliver(event);
- events->dispatch(consumer);
+ // Send the event directly to the base consumer implementation.
+ // We don't really need a queue here but we pass a dummy queue
+ // to conform to the consumer API.
+ QueuedMessage qm(dummy.get(), event);
+ ConsumerImpl::deliver(qm);
}
@@ -292,19 +411,10 @@ void ReplicatingSubscription::sendEvent(
bool ReplicatingSubscription::doDispatch()
{
{
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
if (!dequeues.empty()) sendDequeueEvent(l);
}
return ConsumerImpl::doDispatch();
}
-ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
-ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
-bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { return delegate.deliver(m); }
-void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
-bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
-bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
-bool ReplicatingSubscription::DelegatingConsumer::browseAcquired() const { return delegate.browseAcquired(); }
-OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
-
}} // namespace qpid::ha
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Aug 3 12:13:32 2012
@@ -22,10 +22,10 @@
*
*/
-#include "QueueReplicator.h" // For DEQUEUE_EVENT_KEY
+#include "BrokerInfo.h"
#include "qpid/broker/SemanticState.h"
-#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/ConsumerFactory.h"
+#include "qpid/types/Uuid.h"
#include <iosfwd>
namespace qpid {
@@ -42,18 +42,27 @@ class Buffer;
}
namespace ha {
+class QueueGuard;
/**
- * A susbcription that represents a backup replicating a queue.
+ * A susbcription that replicates to a remote backup.
*
- * Runs on the primary. Delays completion of messages till the backup
- * has acknowledged, informs backup of locally dequeued messages.
+ * Runs on the primary. In conjunction with a QueueGuard, delays completion of
+ * messages till the backup has acknowledged, informs backup of locally dequeued
+ * messages.
+ *
+ * A ReplicatingSubscription is "ready" when all the messages on the queue have
+ * either been acknowledged by the backup, or are protected by the queue guard.
+ * On a primary broker the ReplicatingSubscription calls Primary::readyReplica
+ * when it is ready.
+ *
+ * THREAD SAFE: Called in subscription's connection thread but also in arbitrary
+ * connection threads via dequeued.
+ *
+ * Lifecycle: broker::Queue holds shared_ptrs to this as a consumer.
*
- * THREAD SAFE: Used as a consumer in subscription's connection
- * thread, and as a QueueObserver in arbitrary connection threads.
*/
-class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
- public broker::QueueObserver
+class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
{
public:
struct Factory : public broker::ConsumerFactory {
@@ -67,6 +76,20 @@ class ReplicatingSubscription : public b
// Argument names for consume command.
static const std::string QPID_REPLICATING_SUBSCRIPTION;
+ static const std::string QPID_BACK;
+ static const std::string QPID_FRONT;
+ static const std::string QPID_BROKER_INFO;
+
+ // TODO aconway 2012-05-23: these don't belong on ReplicatingSubscription
+ /** Get position of front message on queue.
+ *@return false if queue is empty.
+ */
+ static bool getFront(broker::Queue&, framing::SequenceNumber& result);
+ /** Get next message after from in queue.
+ *@return false if none found.
+ */
+ static bool getNext(broker::Queue&, framing::SequenceNumber from,
+ framing::SequenceNumber& result);
ReplicatingSubscription(broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
@@ -76,56 +99,46 @@ class ReplicatingSubscription : public b
~ReplicatingSubscription();
- // QueueObserver overrides.
- bool deliver(broker::QueuedMessage& msg);
- void enqueued(const broker::QueuedMessage&);
- void dequeued(const broker::QueuedMessage&);
- void acquired(const broker::QueuedMessage&) {}
- void requeued(const broker::QueuedMessage&) {}
+ // Called via QueueGuard::dequeued.
+ //@return true if the message requires completion.
+ void dequeued(const broker::QueuedMessage& qm);
+
+ // Called during initial scan for dequeues.
+ void dequeued(framing::SequenceNumber first, framing::SequenceNumber last);
// Consumer overrides.
+ bool deliver(broker::QueuedMessage& msg);
void cancel();
void acknowledged(const broker::QueuedMessage&);
bool browseAcquired() const { return true; }
+ // Hide the "queue deleted" error for a ReplicatingSubscription when a
+ // queue is deleted, this is normal and not an error.
+ bool hideDeletedError() { return true; }
+
+ /** Initialization that must be done separately from construction
+ * because it requires a shared_ptr to this to exist.
+ */
+ void initialize();
- bool hideDeletedError();
+ BrokerInfo getBrokerInfo() const { return info; }
protected:
bool doDispatch();
+
private:
- typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed;
- std::string logPrefix, logSuffix;
- boost::shared_ptr<broker::Queue> events;
- boost::shared_ptr<broker::Consumer> consumer;
- Delayed delayed;
+ std::string logPrefix;
+ boost::shared_ptr<broker::Queue> dummy; // Used to send event messages
framing::SequenceSet dequeues;
framing::SequenceNumber backupPosition;
-
- void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&);
- void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&);
- void sendDequeueEvent(const sys::Mutex::ScopedLock&);
- void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
- void sendEvent(const std::string& key, framing::Buffer&,
- const sys::Mutex::ScopedLock&);
-
- class DelegatingConsumer : public Consumer
- {
- public:
- DelegatingConsumer(ReplicatingSubscription&);
- ~DelegatingConsumer();
- bool deliver(broker::QueuedMessage& msg);
- void notify();
- bool filter(boost::intrusive_ptr<broker::Message>);
- bool accept(boost::intrusive_ptr<broker::Message>);
- void cancel() {}
- void acknowledged(const broker::QueuedMessage&) {}
- bool browseAcquired() const;
-
- broker::OwnershipToken* getSession();
-
- private:
- ReplicatingSubscription& delegate;
- };
+ bool ready;
+ BrokerInfo info;
+ boost::shared_ptr<QueueGuard> guard;
+
+ void sendDequeueEvent(sys::Mutex::ScopedLock&);
+ void sendPositionEvent(framing::SequenceNumber, sys::Mutex::ScopedLock&);
+ void setReady();
+ void sendEvent(const std::string& key, framing::Buffer&);
+ friend struct Factory;
};
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/Settings.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/Settings.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/Settings.h Fri Aug 3 12:13:32 2012
@@ -22,7 +22,7 @@
*
*/
-#include "ReplicateLevel.h"
+#include "types.h"
#include <string>
namespace qpid {
@@ -34,13 +34,15 @@ namespace ha {
class Settings
{
public:
- Settings() : cluster(false), expectedBackups(0), replicateDefault(RL_NONE) {}
+ Settings() : cluster(false), replicateDefault(NONE), backupTimeout(5)
+ {}
+
bool cluster; // True if we are a cluster member.
std::string clientUrl;
std::string brokerUrl;
- size_t expectedBackups;
- ReplicateLevel replicateDefault;
+ Enum<ReplicateLevel> replicateDefault;
std::string username, password, mechanism;
+ double backupTimeout;
private:
};
}} // namespace qpid::ha
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/management-schema.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/management-schema.xml (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/management-schema.xml Fri Aug 3 12:13:32 2012
@@ -25,37 +25,39 @@
<property name="status" type="sstr" desc="HA status: primary or backup"/>
- <property name="brokers" type="sstr"
- desc="Multiple-address URL used by HA brokers to connect to each other."/>
+ <property name="brokersUrl" type="sstr"
+ desc="URL with address of each broker in the cluster."/>
- <property name="publicBrokers" type="sstr"
- desc="Multiple-address URL used by clients to connect to the HA brokers."/>
+ <property name="publicUrl" type="sstr"
+ desc="URL advertized to clients to connect to the cluster."/>
- <property name="expectedBackups" type="uint16"
- desc="Number of HA backup brokers expected."/>
+ <property name="replicateDefault" type="sstr"
+ desc="Replication for queues/exchanges with no qpid.replicate argument"/>
- <property
- name="replicateDefault" type="sstr"
- desc="Replicate value for queues/exchanges without a qpid.replicate argument"/>
+ <property name="members" type="list" desc="List of brokers in the cluster"/>
+
+ <property name="systemId" type="uuid" desc="Identifies the system."/>
<method name="promote" desc="Promote a backup broker to primary."/>
- <method name="setBrokers" desc="Set URL for HA brokers to connect to each other.">
+ <method name="setBrokersUrl" desc="URL listing each broker in the cluster.">
<arg name="url" type="sstr" dir="I"/>
</method>
- <method name="setPublicBrokers" desc="Set URL for clients to connect to HA brokers">
+ <method name="setPublicUrl" desc="URL advertized to clients.">
<arg name="url" type="sstr" dir="I"/>
</method>
- <method name="setExpectedBackups" desc="Set number of backups expected">
- <arg name="expectedBackups" type="uint16" dir="I"/>
- </method>
-
<method name="replicate" desc="Replicate individual queue from remote broker.">
<arg name="broker" type="sstr" dir="I"/>
<arg name="queue" type="sstr" dir="I"/>
</method>
</class>
+ <eventArguments>
+ <arg name="members" type="list" desc="List of broker information maps"/>
+ </eventArguments>
+
+ <event name="membersUpdate" sev="inform" args="members"/>
+
</schema>
Modified: qpid/branches/asyncstore/cpp/src/qpid/log/Logger.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/log/Logger.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/log/Logger.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/log/Logger.cpp Fri Aug 3 12:13:32 2012
@@ -47,7 +47,7 @@ using namespace std;
typedef sys::Mutex::ScopedLock ScopedLock;
inline void Logger::enable_unlocked(Statement* s) {
- s->enabled=selector.isEnabled(s->level, s->function);
+ s->enabled=selector.isEnabled(s->level, s->function, s->category);
}
Logger& Logger::instance() {
@@ -95,6 +95,8 @@ void Logger::log(const Statement& s, con
else
qpid::sys::outputFormattedNow(os);
}
+ if (flags&CATEGORY)
+ os << "[" << CategoryTraits::name(s.category) << "] ";
if (flags&LEVEL)
os << LevelTraits::name(s.level) << " ";
if (flags&THREAD)
@@ -144,7 +146,8 @@ int Logger::format(const Options& opts)
bitIf(opts.source, (FILE|LINE)) |
bitIf(opts.function, FUNCTION) |
bitIf(opts.thread, THREAD) |
- bitIf(opts.hiresTs, HIRES);
+ bitIf(opts.hiresTs, HIRES) |
+ bitIf(opts.category, CATEGORY);
format(flags);
return flags;
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/log/Options.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/log/Options.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/log/Options.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/log/Options.cpp Fri Aug 3 12:13:32 2012
@@ -39,6 +39,7 @@ Options::Options(const std::string& argv
source(false),
function(false),
hiresTs(false),
+ category(true),
trace(false),
sinkOptions (SinkOptions::create(argv0_))
{
@@ -49,15 +50,23 @@ Options::Options(const std::string& argv
for (int i = 1; i < LevelTraits::COUNT; ++i)
levels << " " << LevelTraits::name(Level(i));
+ ostringstream categories;
+ categories << CategoryTraits::name(Category(0));
+ for (int i = 1; i < CategoryTraits::COUNT; ++i)
+ categories << " " << CategoryTraits::name(Category(i));
+
addOptions()
("trace,t", optValue(trace), "Enables all logging" )
("log-enable", optValue(selectors, "RULE"),
- ("Enables logging for selected levels and components. "
+ ("Enables logging for selected levels and components. "
"RULE is in the form 'LEVEL[+][:PATTERN]' "
- "Levels are one of: \n\t "+levels.str()+"\n"
+ "LEVEL is one of: \n\t "+levels.str()+"\n"
+ "PATTERN is a function name or a catogory: \n\t "+categories.str()+"\n"
"For example:\n"
"\t'--log-enable warning+' "
"logs all warning, error and critical messages.\n"
+ "\t'--log-enable trace+:Broker' "
+ "logs all category 'Broker' messages.\n"
"\t'--log-enable debug:framing' "
"logs debug messages from the framing namespace. "
"This option can be used multiple times").c_str())
@@ -67,6 +76,7 @@ Options::Options(const std::string& argv
("log-thread", optValue(thread,"yes|no"), "Include thread ID in log messages")
("log-function", optValue(function,"yes|no"), "Include function signature in log messages")
("log-hires-timestamp", optValue(hiresTs,"yes|no"), "Use hi-resolution timestamps in log messages")
+ ("log-category", optValue(category,"yes|no"), "Include category in log messages")
("log-prefix", optValue(prefix,"STRING"), "Prefix to append to all log messages")
;
add(*sinkOptions);
@@ -83,6 +93,7 @@ Options::Options(const Options &o) :
source(o.source),
function(o.function),
hiresTs(o.hiresTs),
+ category(o.category),
trace(o.trace),
prefix(o.prefix),
sinkOptions (SinkOptions::create(o.argv0))
@@ -101,11 +112,12 @@ Options& Options::operator=(const Option
source = x.source;
function = x.function;
hiresTs = x.hiresTs;
+ category = x.category;
trace = x.trace;
prefix = x.prefix;
*sinkOptions = *x.sinkOptions;
}
return *this;
}
-
+
}} // namespace qpid::log
Modified: qpid/branches/asyncstore/cpp/src/qpid/log/Selector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/log/Selector.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/log/Selector.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/log/Selector.cpp Fri Aug 3 12:13:32 2012
@@ -37,18 +37,29 @@ void Selector::enable(const string& enab
level=enableStr.substr(0,c);
pattern=enableStr.substr(c+1);
}
+ bool isCat = CategoryTraits::isCategory(pattern);
if (!level.empty() && level[level.size()-1]=='+') {
for (int i = LevelTraits::level(level.substr(0,level.size()-1));
i < LevelTraits::COUNT;
- ++i)
- enable(Level(i), pattern);
+ ++i) {
+ if (isCat) {
+ enable(Level(i), CategoryTraits::category(pattern));
+ } else {
+ enable(Level(i), pattern);
+ }
+ }
}
else {
- enable(LevelTraits::level(level), pattern);
+ if (isCat) {
+ enable(LevelTraits::level(level), CategoryTraits::category(pattern));
+ } else {
+ enable(LevelTraits::level(level), pattern);
+ }
}
}
Selector::Selector(const Options& opt){
+ reset();
for_each(opt.selectors.begin(), opt.selectors.end(),
boost::bind(&Selector::enable, this, _1));
}
@@ -58,11 +69,17 @@ bool Selector::isEnabled(Level level, co
for (std::vector<std::string>::iterator i=substrings[level].begin();
i != substrings[level].end();
++i)
- {
- if (std::search(function, functionEnd, i->begin(), i->end()) != functionEnd)
- return true;
- }
+ {
+ if (std::search(function, functionEnd, i->begin(), i->end()) != functionEnd)
+ return true;
+ }
return false;
}
+bool Selector::isEnabled(Level level, const char* function, Category category) {
+ if (catFlags[level][category])
+ return true;
+ return isEnabled(level, function);
+}
+
}} // namespace qpid::log
Modified: qpid/branches/asyncstore/cpp/src/qpid/log/Statement.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/log/Statement.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/log/Statement.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/log/Statement.cpp Fri Aug 3 12:13:32 2012
@@ -36,7 +36,7 @@ std::string quote(const std::string& str
size_t n = std::count_if(str.begin(), str.end(), nonPrint);
if (n==0) return str;
std::string ret;
- ret.reserve(str.size()+2*n); // Avoid extra allocations.
+ ret.reserve(str.size()+3*n); // Avoid extra allocations.
for (std::string::const_iterator i = str.begin(); i != str.end(); ++i) {
if (nonPrint(*i)) {
ret.push_back('\\');
@@ -50,10 +50,42 @@ std::string quote(const std::string& str
}
}
+//
+// Instance of name hints
+//
+static CategoryFileNameHints filenameHints;
+
+
+Category CategoryFileNameHints::categoryOf(const char* const fName) {
+ for (std::list<std::pair<const char* const, Category> >::iterator
+ it = filenameHints.hintList.begin();
+ it != filenameHints.hintList.end();
+ ++it) {
+ if (strstr(fName, (const char* const)it->first) != 0) {
+ return it->second;
+ }
+ }
+ return unspecified;
+}
+
+
+void Statement::categorize(Statement& s) {
+ // given a statement and it's category
+ // if the category is Unspecified then try to find a
+ // better category based on the path and file name.
+ if (s.category == log::unspecified) {
+ s.category = CategoryFileNameHints::categoryOf(s.file);
+ } else {
+ // already has a category so leave it alone
+ }
+}
+
+
void Statement::log(const std::string& message) {
Logger::instance().log(*this, quote(message));
}
+
Statement::Initializer::Initializer(Statement& s) : statement(s) {
// QPID-3891
// From the given BOOST_CURRENT_FUNCTION name extract only the
@@ -99,16 +131,22 @@ Statement::Initializer::Initializer(Stat
// no function-name pointer to process
}
+ Statement::categorize(s);
Logger::instance().add(s);
}
+
namespace {
const char* names[LevelTraits::COUNT] = {
"trace", "debug", "info", "notice", "warning", "error", "critical"
};
-} // namespace
+const char* catNames[CategoryTraits::COUNT] = {
+ "Security", "Broker", "Management", "Protocol", "System", "HA", "Messaging",
+ "Store", "Network", "Test", "Client", "Model", "Unspecified"
+};
+} // namespace
Level LevelTraits::level(const char* name) {
for (int i =0; i < LevelTraits::COUNT; ++i) {
if (strcmp(names[i], name)==0)
@@ -121,4 +159,23 @@ const char* LevelTraits::name(Level l) {
return names[l];
}
+bool CategoryTraits::isCategory(const std::string& name) {
+ for (int i =0; i < CategoryTraits::COUNT; ++i) {
+ if (strcmp(catNames[i], name.c_str())==0)
+ return true;
+ }
+ return false;
+}
+
+Category CategoryTraits::category(const char* name) {
+ for (int i =0; i < CategoryTraits::COUNT; ++i) {
+ if (strcmp(catNames[i], name)==0)
+ return Category(i);
+ }
+ throw std::runtime_error(std::string("Invalid log category name: ")+name);
+}
+
+const char* CategoryTraits::name(Category c) {
+ return catNames[c];
+}
}} // namespace qpid::log
Modified: qpid/branches/asyncstore/cpp/src/qpid/log/posix/SinkOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/log/posix/SinkOptions.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/log/posix/SinkOptions.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/log/posix/SinkOptions.cpp Fri Aug 3 12:13:32 2012
@@ -22,11 +22,14 @@
#include "qpid/log/OstreamOutput.h"
#include "qpid/memory.h"
#include "qpid/Exception.h"
+
#include <iostream>
#include <map>
#include <string>
#include <syslog.h>
+#include <boost/lexical_cast.hpp>
+
using std::string;
using qpid::Exception;
@@ -90,7 +93,7 @@ public:
string name(int value) const {
ByValue::const_iterator i = byValue.find(value);
if (i == byValue.end())
- throw Exception("Not a valid syslog value: " + value);
+ throw Exception("Not a valid syslog value: " + boost::lexical_cast<string>(value));
return i->second;
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/management/Buffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/Buffer.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/management/Buffer.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/management/Buffer.cpp Fri Aug 3 12:13:32 2012
@@ -29,12 +29,11 @@ namespace management {
Buffer::Buffer(char* data, uint32_t size) : impl(new framing::Buffer(data, size)) {}
Buffer::~Buffer() { delete impl; }
-void Buffer::record() { impl->record(); }
-void Buffer::restore(bool reRecord) { impl->restore(reRecord); }
void Buffer::reset() { impl->reset(); }
uint32_t Buffer::available() { return impl->available(); }
uint32_t Buffer::getSize() { return impl->getSize(); }
uint32_t Buffer::getPosition() { return impl->getPosition(); }
+void Buffer::setPosition(uint32_t p) { impl->setPosition(p); }
char* Buffer::getPointer() { return impl->getPointer(); }
void Buffer::putOctet(uint8_t i) { impl->putOctet(i); }
void Buffer::putShort(uint16_t i) { impl->putShort(i); }
Modified: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp Fri Aug 3 12:13:32 2012
@@ -1344,18 +1344,19 @@ void ManagementAgent::handleMethodReques
outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID);
outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID));
}
- else
+ else {
+ uint32_t pos = outBuffer.getPosition();
try {
- outBuffer.record();
sys::Mutex::ScopedUnlock u(userLock);
string outBuf;
iter->second->doMethod(methodName, inArgs, outBuf, userId);
outBuffer.putRawData(outBuf);
} catch(exception& e) {
- outBuffer.restore();
+ outBuffer.setPosition(pos);;
outBuffer.putLong(Manageable::STATUS_EXCEPTION);
outBuffer.putMediumString(e.what());
}
+ }
}
outLen = MA_BUFFER_SIZE - outBuffer.available();
@@ -1662,11 +1663,11 @@ void ManagementAgent::handleSchemaRespon
string packageName;
SchemaClassKey key;
- inBuffer.record();
+ uint32_t pos = inBuffer.getPosition();
inBuffer.getOctet();
inBuffer.getShortString(packageName);
key.decode(inBuffer);
- inBuffer.restore();
+ inBuffer.setPosition(pos);;
QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
@@ -2426,7 +2427,6 @@ size_t ManagementAgent::validateTableSch
uint8_t hash[16];
try {
- inBuffer.record();
uint8_t kind = inBuffer.getOctet();
if (kind != ManagementItem::CLASS_KIND_TABLE)
return 0;
@@ -2468,7 +2468,7 @@ size_t ManagementAgent::validateTableSch
}
end = inBuffer.getPosition();
- inBuffer.restore(); // restore original position
+ inBuffer.setPosition(start); // restore original position
return end - start;
}
@@ -2480,7 +2480,6 @@ size_t ManagementAgent::validateEventSch
uint8_t hash[16];
try {
- inBuffer.record();
uint8_t kind = inBuffer.getOctet();
if (kind != ManagementItem::CLASS_KIND_EVENT)
return 0;
@@ -2507,7 +2506,7 @@ size_t ManagementAgent::validateEventSch
}
end = inBuffer.getPosition();
- inBuffer.restore(); // restore original position
+ inBuffer.setPosition(start); // restore original position
return end - start;
}
Propchange: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:r1333988-1368650
Propchange: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:r1333988-1368650
Modified: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.cpp Fri Aug 3 12:13:32 2012
@@ -28,7 +28,7 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-ManagementDirectExchange::ManagementDirectExchange(const string& _name, Manageable* _parent, Broker* b) :
+ManagementDirectExchange::ManagementDirectExchange(const std::string& _name, Manageable* _parent, Broker* b) :
Exchange (_name, _parent, b),
DirectExchange(_name, _parent, b),
managementAgent(0) {}
@@ -43,7 +43,7 @@ ManagementDirectExchange::ManagementDire
void ManagementDirectExchange::route(Deliverable& msg)
{
bool routeIt = true;
- const string& routingKey = msg.getMessage().getRoutingKey();
+ const std::string& routingKey = msg.getMessage().getRoutingKey();
const FieldTable* args = msg.getMessage().getApplicationHeaders();
if (managementAgent)
Modified: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.cpp Fri Aug 3 12:13:32 2012
@@ -27,7 +27,7 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::sys;
-ManagementTopicExchange::ManagementTopicExchange(const string& _name, Manageable* _parent, Broker* b) :
+ManagementTopicExchange::ManagementTopicExchange(const std::string& _name, Manageable* _parent, Broker* b) :
Exchange (_name, _parent, b),
TopicExchange(_name, _parent, b),
managementAgent(0) {}
@@ -42,7 +42,7 @@ ManagementTopicExchange::ManagementTopic
void ManagementTopicExchange::route(Deliverable& msg)
{
bool routeIt = true;
- const string& routingKey = msg.getMessage().getRoutingKey();
+ const std::string& routingKey = msg.getMessage().getRoutingKey();
const FieldTable* args = msg.getMessage().getApplicationHeaders();
// Intercept management agent commands
@@ -54,7 +54,7 @@ void ManagementTopicExchange::route(Deli
}
bool ManagementTopicExchange::bind(Queue::shared_ptr queue,
- const string& routingKey,
+ const std::string& routingKey,
const qpid::framing::FieldTable* args)
{
if (qmfVersion == 1)
Modified: qpid/branches/asyncstore/cpp/src/qpid/messaging/PrivateImplRef.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/messaging/PrivateImplRef.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/messaging/PrivateImplRef.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/messaging/PrivateImplRef.h Fri Aug 3 12:13:32 2012
@@ -77,15 +77,15 @@ template <class T> class PrivateImplRef
/** Set the implementation pointer in a handle */
static void set(T& t, const intrusive_ptr& p) {
if (t.impl == p) return;
- if (t.impl) boost::intrusive_ptr_release(t.impl);
+ if (t.impl) intrusive_ptr_release(t.impl);
t.impl = p.get();
- if (t.impl) boost::intrusive_ptr_add_ref(t.impl);
+ if (t.impl) intrusive_ptr_add_ref(t.impl);
}
// Helper functions to implement the ctor, dtor, copy, assign
- static void ctor(T& t, Impl* p) { t.impl = p; if (p) boost::intrusive_ptr_add_ref(p); }
+ static void ctor(T& t, Impl* p) { t.impl = p; if (p) intrusive_ptr_add_ref(p); }
static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); }
- static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); }
+ static void dtor(T& t) { if(t.impl) intrusive_ptr_release(t.impl); }
static T& assign(T& t, const T& x) { set(t, get(x)); return t;}
};
Modified: qpid/branches/asyncstore/cpp/src/qpid/replication/ReplicationExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/replication/ReplicationExchange.cpp Fri Aug 3 12:13:32 2012
@@ -184,7 +184,7 @@ bool ReplicationExchange::unbind(Queue::
throw NotImplementedException("Replication exchange does not support unbind operation");
}
-bool ReplicationExchange::isBound(Queue::shared_ptr /*queue*/, const string* const /*routingKey*/, const FieldTable* const /*args*/)
+bool ReplicationExchange::isBound(Queue::shared_ptr /*queue*/, const std::string* const /*routingKey*/, const FieldTable* const /*args*/)
{
return false;
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp Fri Aug 3 12:13:32 2012
@@ -23,6 +23,7 @@
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/SecuritySettings.h"
+#include "qpid/sys/Timer.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/log/Statement.h"
@@ -41,11 +42,30 @@ struct Buff : public AsynchIO::BufferBas
{ delete [] bytes;}
};
-AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
+struct ProtocolTimeoutTask : public sys::TimerTask {
+ AsynchIOHandler& handler;
+ std::string id;
+
+ ProtocolTimeoutTask(const std::string& i, const Duration& timeout, AsynchIOHandler& h) :
+ TimerTask(timeout, "ProtocolTimeout"),
+ handler(h),
+ id(i)
+ {}
+
+ void fire() {
+ // If this fires it means that we didn't negotiate the connection in the timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection " << id << " No protocol received closing");
+ handler.abort();
+ }
+};
+
+AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f) :
identifier(id),
aio(0),
factory(f),
codec(0),
+ reads(0),
readError(false),
isClient(false),
readCredit(InfiniteCredit)
@@ -54,12 +74,18 @@ AsynchIOHandler::AsynchIOHandler(std::st
AsynchIOHandler::~AsynchIOHandler() {
if (codec)
codec->closed();
+ if (timeoutTimerTask)
+ timeoutTimerTask->cancel();
delete codec;
}
-void AsynchIOHandler::init(AsynchIO* a, int numBuffs) {
+void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime, int numBuffs) {
aio = a;
+ // Start timer for this connection
+ timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this);
+ timer.add(timeoutTimerTask);
+
// Give connection some buffers to use
for (int i = 0; i < numBuffs; i++) {
aio->queueReadBuffer(new Buff);
@@ -129,10 +155,18 @@ void AsynchIOHandler::readbuff(AsynchIO&
}
}
+ ++reads;
size_t decoded = 0;
if (codec) { // Already initiated
try {
decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
+ // When we've decoded 3 reads (probably frames) we will have authenticated and
+ // started heartbeats, if specified, in many (but not all) cases so now we will cancel
+ // the idle connection timeout - this is really hacky, and would be better implemented
+ // in the connection, but that isn't actually created until the first decode.
+ if (reads == 3) {
+ timeoutTimerTask->cancel();
+ }
}catch(const std::exception& e){
QPID_LOG(error, e.what());
readError = true;
@@ -143,6 +177,7 @@ void AsynchIOHandler::readbuff(AsynchIO&
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
decoded = in.getPosition();
+
QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
try {
codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
@@ -202,6 +237,10 @@ void AsynchIOHandler::idle(AsynchIO&){
if (isClient && codec == 0) {
codec = factory->create(*this, identifier, SecuritySettings());
write(framing::ProtocolInitiation(codec->getVersion()));
+ // We've just sent the protocol negotiation so we can cancel the timeout for that
+ // This is not ideal, because we've not received anything yet, but heartbeats will
+ // be active soon
+ timeoutTimerTask->cancel();
return;
}
if (codec == 0) return;
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h Fri Aug 3 12:13:32 2012
@@ -27,6 +27,8 @@
#include "qpid/sys/Mutex.h"
#include "qpid/CommonImportExport.h"
+#include <boost/intrusive_ptr.hpp>
+
namespace qpid {
namespace framing {
@@ -38,24 +40,28 @@ namespace sys {
class AsynchIO;
struct AsynchIOBufferBase;
class Socket;
+class Timer;
+class TimerTask;
class AsynchIOHandler : public OutputControl {
std::string identifier;
AsynchIO* aio;
ConnectionCodec::Factory* factory;
ConnectionCodec* codec;
+ uint32_t reads;
bool readError;
bool isClient;
AtomicValue<int32_t> readCredit;
static const int32_t InfiniteCredit = -1;
Mutex creditLock;
+ boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask;
void write(const framing::ProtocolInitiation&);
public:
- QPID_COMMON_EXTERN AsynchIOHandler(std::string id, ConnectionCodec::Factory* f);
+ QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f );
QPID_COMMON_EXTERN ~AsynchIOHandler();
- QPID_COMMON_EXTERN void init(AsynchIO* a, int numBuffs);
+ QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime, int numBuffs);
QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp Fri Aug 3 12:13:32 2012
@@ -39,6 +39,8 @@
namespace qpid {
namespace sys {
+class Timer;
+
using namespace qpid::sys::ssl;
struct SslServerOptions : ssl::SslOptions
@@ -68,6 +70,8 @@ class SslProtocolFactoryTmpl : public Pr
typedef SslAcceptorTmpl<T> SslAcceptor;
+ Timer& brokerTimer;
+ uint32_t maxNegotiateTime;
const bool tcpNoDelay;
T listener;
const uint16_t listeningPort;
@@ -75,7 +79,7 @@ class SslProtocolFactoryTmpl : public Pr
bool nodict;
public:
- SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay);
+ SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay, Timer& timer, uint32_t maxTime);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
@@ -132,16 +136,18 @@ static struct SslPlugin : public Plugin
try {
ssl::initNSS(options, true);
nssInitialized = true;
-
+
const broker::Broker::Options& opts = broker->getOptions();
ProtocolFactory::shared_ptr protocol(options.multiplex ?
static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options,
opts.connectionBacklog,
- opts.tcpNoDelay)) :
+ opts.tcpNoDelay,
+ broker->getTimer(), opts.maxNegotiateTime)) :
static_cast<ProtocolFactory*>(new SslProtocolFactory(options,
opts.connectionBacklog,
- opts.tcpNoDelay)));
+ opts.tcpNoDelay,
+ broker->getTimer(), opts.maxNegotiateTime)));
QPID_LOG(notice, "Listening for " <<
(options.multiplex ? "SSL or TCP" : "SSL") <<
" connections on TCP port " <<
@@ -156,14 +162,16 @@ static struct SslPlugin : public Plugin
} sslPlugin;
template <class T>
-SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay) :
+SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay, Timer& timer, uint32_t maxTime) :
+ brokerTimer(timer),
+ maxNegotiateTime(maxTime),
tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)),
nodict(options.nodict)
{}
void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s,
ConnectionCodec::Factory* f, bool isClient,
- bool tcpNoDelay, bool nodict) {
+ Timer& timer, uint32_t maxTime, bool tcpNoDelay, bool nodict) {
qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict);
if (tcpNoDelay) {
@@ -183,7 +191,7 @@ void SslEstablished(Poller::shared_ptr p
boost::bind(&qpid::sys::ssl::SslHandler::nobuffs, async, _1),
boost::bind(&qpid::sys::ssl::SslHandler::idle, async, _1));
- async->init(aio, 4);
+ async->init(aio,timer, maxTime, 4);
aio->start(poller);
}
@@ -192,7 +200,7 @@ void SslProtocolFactory::established(Pol
ConnectionCodec::Factory* f, bool isClient) {
const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
- SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict);
+ SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict);
}
template <class T>
@@ -216,7 +224,7 @@ void SslMuxProtocolFactory::established(
const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
if (sslSock) {
- SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict);
+ SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict);
return;
}
@@ -239,7 +247,7 @@ void SslMuxProtocolFactory::established(
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, 4);
+ async->init(aio, brokerTimer, maxNegotiateTime, 4);
aio->start(poller);
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/TCPIOPlugin.cpp Fri Aug 3 12:13:32 2012
@@ -36,14 +36,21 @@
namespace qpid {
namespace sys {
+class Timer;
+
class AsynchIOProtocolFactory : public ProtocolFactory {
- const bool tcpNoDelay;
boost::ptr_vector<Socket> listeners;
boost::ptr_vector<AsynchAcceptor> acceptors;
+ Timer& brokerTimer;
+ uint32_t maxNegotiateTime;
uint16_t listeningPort;
+ const bool tcpNoDelay;
public:
- AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen);
+ AsynchIOProtocolFactory(const std::string& host, const std::string& port,
+ int backlog, bool nodelay,
+ Timer& timer, uint32_t maxTime,
+ bool shouldListen);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
ConnectionCodec::Factory*,
@@ -90,6 +97,7 @@ static class TCPIOPlugin : public Plugin
"", boost::lexical_cast<std::string>(opts.port),
opts.connectionBacklog,
opts.tcpNoDelay,
+ broker->getTimer(), opts.maxNegotiateTime,
shouldListen));
if (shouldListen) {
@@ -101,7 +109,12 @@ static class TCPIOPlugin : public Plugin
}
} tcpPlugin;
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen) :
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port,
+ int backlog, bool nodelay,
+ Timer& timer, uint32_t maxTime,
+ bool shouldListen) :
+ brokerTimer(timer),
+ maxNegotiateTime(maxTime),
tcpNoDelay(nodelay)
{
if (!shouldListen) {
@@ -153,7 +166,7 @@ void AsynchIOProtocolFactory::establishe
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, 4);
+ async->init(aio, brokerTimer, maxNegotiateTime, 4);
aio->start(poller);
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/Timer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/Timer.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/Timer.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/Timer.cpp Fri Aug 3 12:13:32 2012
@@ -35,7 +35,7 @@ TimerTask::TimerTask(Duration timeout, c
sortTime(AbsTime::FarFuture()),
period(timeout),
nextFireTime(AbsTime::now(), timeout),
- cancelled(false)
+ state(WAITING)
{}
TimerTask::TimerTask(AbsTime time, const std::string& n) :
@@ -43,7 +43,7 @@ TimerTask::TimerTask(AbsTime time, const
sortTime(AbsTime::FarFuture()),
period(0),
nextFireTime(time),
- cancelled(false)
+ state(WAITING)
{}
TimerTask::~TimerTask() {}
@@ -52,27 +52,48 @@ bool TimerTask::readyToFire() const {
return !(nextFireTime > AbsTime::now());
}
+bool TimerTask::prepareToFire() {
+ Monitor::ScopedLock l(stateMonitor);
+ if (state != CANCELLED) {
+ state = CALLING;
+ return true;
+ } else {
+ return false;
+ }
+}
+
void TimerTask::fireTask() {
- cancelled = true;
fire();
}
+void TimerTask::finishFiring() {
+ Monitor::ScopedLock l(stateMonitor);
+ if (state != CANCELLED) {
+ state = WAITING;
+ stateMonitor.notifyAll();
+ }
+}
+
// This can only be used to setup the next fire time. After the Timer has already fired
void TimerTask::setupNextFire() {
if (period && readyToFire()) {
nextFireTime = max(AbsTime::now(), AbsTime(nextFireTime, period));
- cancelled = false;
} else {
QPID_LOG(error, name << " couldn't setup next timer firing: " << Duration(nextFireTime, AbsTime::now()) << "[" << period << "]");
}
}
// Only allow tasks to be delayed
-void TimerTask::restart() { nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period)); }
+void TimerTask::restart() {
+ nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period));
+}
void TimerTask::cancel() {
- ScopedLock<Mutex> l(callbackLock);
- cancelled = true;
+ Monitor::ScopedLock l(stateMonitor);
+ while (state == CALLING) {
+ stateMonitor.wait();
+ }
+ state = CANCELLED;
}
void TimerTask::setFired() {
@@ -96,6 +117,22 @@ Timer::~Timer()
stop();
}
+class TimerTaskCallbackScope {
+ TimerTask& tt;
+public:
+ explicit TimerTaskCallbackScope(TimerTask& t) :
+ tt(t)
+ {}
+
+ operator bool() {
+ return !tt.prepareToFire();
+ }
+
+ ~TimerTaskCallbackScope() {
+ tt.finishFiring();
+ }
+};
+
// TODO AStitcher 21/08/09 The threshholds for emitting warnings are a little arbitrary
void Timer::run()
{
@@ -112,8 +149,8 @@ void Timer::run()
AbsTime start(AbsTime::now());
Duration delay(t->sortTime, start);
{
- ScopedLock<Mutex> l(t->callbackLock);
- if (t->cancelled) {
+ TimerTaskCallbackScope s(*t);
+ if (s) {
{
Monitor::ScopedUnlock u(monitor);
drop(t);
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/Timer.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/Timer.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/Timer.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/Timer.h Fri Aug 3 12:13:32 2012
@@ -40,6 +40,7 @@ class Timer;
class TimerTask : public RefCounted {
friend class Timer;
+ friend class TimerTaskCallbackScope;
friend bool operator<(const boost::intrusive_ptr<TimerTask>&,
const boost::intrusive_ptr<TimerTask>&);
@@ -47,9 +48,11 @@ class TimerTask : public RefCounted {
AbsTime sortTime;
Duration period;
AbsTime nextFireTime;
- Mutex callbackLock;
- volatile bool cancelled;
+ qpid::sys::Monitor stateMonitor;
+ enum {WAITING, CALLING, CANCELLED} state;
+ bool prepareToFire();
+ void finishFiring();
bool readyToFire() const;
void fireTask();
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp Fri Aug 3 12:13:32 2012
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include <unistd.h>
#include "qpid/sys/cyrus/CyrusSecurityLayer.h"
#include <algorithm>
#include "qpid/framing/reply_exceptions.h"
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/posix/LockFile.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/posix/LockFile.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/posix/LockFile.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/posix/LockFile.cpp Fri Aug 3 12:13:32 2012
@@ -46,7 +46,7 @@ LockFile::LockFile(const std::string& pa
errno = 0;
int flags=create ? O_WRONLY|O_CREAT|O_NOFOLLOW : O_RDWR;
int fd = ::open(path.c_str(), flags, 0644);
- if (fd < 0) throw ErrnoException("Cannot open " + path, errno);
+ if (fd < 0) throw ErrnoException("Cannot open lock file " + path, errno);
if (::lockf(fd, F_TLOCK, 0) < 0) {
::close(fd);
throw ErrnoException("Cannot lock " + path, errno);
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/posix/MemStat.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/posix/MemStat.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/posix/MemStat.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/posix/MemStat.cpp Fri Aug 3 12:13:32 2012
@@ -20,6 +20,7 @@
*/
#include "qpid/sys/MemStat.h"
+
#include <malloc.h>
void qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory* object)
@@ -35,4 +36,3 @@ void qpid::sys::MemStat::loadMemInfo(qmf
object->set_malloc_keepcost(info.keepcost);
}
-
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SocketAddress.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SocketAddress.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SocketAddress.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SocketAddress.cpp Fri Aug 3 12:13:32 2012
@@ -35,14 +35,16 @@ namespace sys {
SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) :
host(host0),
port(port0),
- addrInfo(0)
+ addrInfo(0),
+ currentAddrInfo(0)
{
}
SocketAddress::SocketAddress(const SocketAddress& sa) :
host(sa.host),
port(sa.port),
- addrInfo(0)
+ addrInfo(0),
+ currentAddrInfo(0)
{
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SystemInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SystemInfo.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SystemInfo.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SystemInfo.cpp Fri Aug 3 12:13:32 2012
@@ -18,10 +18,11 @@
*
*/
+#include "qpid/log/Statement.h"
#include "qpid/sys/SystemInfo.h"
-
#include "qpid/sys/posix/check.h"
-
+#include <set>
+#include <arpa/inet.h>
#include <sys/ioctl.h>
#include <sys/utsname.h>
#include <sys/types.h> // For FreeBSD
@@ -33,6 +34,7 @@
#include <fstream>
#include <sstream>
#include <netdb.h>
+#include <string.h>
#ifndef HOST_NAME_MAX
# define HOST_NAME_MAX 256
@@ -59,48 +61,100 @@ bool SystemInfo::getLocalHostname (Addre
return true;
}
-static const string LOCALHOST("127.0.0.1");
+static const string LOOPBACK("127.0.0.1");
static const string TCP("tcp");
+// Test IPv4 address for loopback
+inline bool IN_IS_ADDR_LOOPBACK(const ::in_addr* a) {
+ return ((ntohl(a->s_addr) & 0xff000000) == 0x7f000000);
+}
+
+inline bool isLoopback(const ::sockaddr* addr) {
+ switch (addr->sa_family) {
+ case AF_INET: return IN_IS_ADDR_LOOPBACK(&((const ::sockaddr_in*)(const void*)addr)->sin_addr);
+ case AF_INET6: return IN6_IS_ADDR_LOOPBACK(&((const ::sockaddr_in6*)(const void*)addr)->sin6_addr);
+ default: return false;
+ }
+}
+
void SystemInfo::getLocalIpAddresses (uint16_t port,
std::vector<Address> &addrList) {
::ifaddrs* ifaddr = 0;
QPID_POSIX_CHECK(::getifaddrs(&ifaddr));
for (::ifaddrs* ifap = ifaddr; ifap != 0; ifap = ifap->ifa_next) {
if (ifap->ifa_addr == 0) continue;
-
+ if (isLoopback(ifap->ifa_addr)) continue;
int family = ifap->ifa_addr->sa_family;
switch (family) {
- case AF_INET: {
- char dispName[NI_MAXHOST];
- int rc = ::getnameinfo(
- ifap->ifa_addr,
- (family == AF_INET)
- ? sizeof(struct sockaddr_in)
- : sizeof(struct sockaddr_in6),
- dispName, sizeof(dispName),
- 0, 0, NI_NUMERICHOST);
- if (rc != 0) {
- throw QPID_POSIX_ERROR(rc);
+ case AF_INET6: {
+ // Ignore link local addresses as:
+ // * The scope id is illegal in URL syntax
+ // * Clients won't be able to use a link local address
+ // without adding their own (potentially different) scope id
+ sockaddr_in6* sa6 = (sockaddr_in6*)(ifap->ifa_addr);
+ if (IN6_IS_ADDR_LINKLOCAL(&sa6->sin6_addr)) break;
+ // Fallthrough
}
- string addr(dispName);
- if (addr != LOCALHOST) {
- addrList.push_back(Address(TCP, addr, port));
- }
- break;
- }
- // TODO: Url parsing currently can't cope with IPv6 addresses so don't return them
- // when it can cope move this line to above "case AF_INET:"
- case AF_INET6:
- default:
+ case AF_INET: {
+ char dispName[NI_MAXHOST];
+ int rc = ::getnameinfo(
+ ifap->ifa_addr,
+ (family == AF_INET)
+ ? sizeof(struct sockaddr_in)
+ : sizeof(struct sockaddr_in6),
+ dispName, sizeof(dispName),
+ 0, 0, NI_NUMERICHOST);
+ if (rc != 0) {
+ throw QPID_POSIX_ERROR(rc);
+ }
+ string addr(dispName);
+ addrList.push_back(Address(TCP, addr, port));
+ break;
+ }
+ default:
continue;
}
}
- freeifaddrs(ifaddr);
+ ::freeifaddrs(ifaddr);
if (addrList.empty()) {
- addrList.push_back(Address(TCP, LOCALHOST, port));
+ addrList.push_back(Address(TCP, LOOPBACK, port));
+ }
+}
+
+namespace {
+struct AddrInfo {
+ struct addrinfo* ptr;
+ AddrInfo(const std::string& host) : ptr(0) {
+ ::addrinfo hints;
+ ::memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
+ if (::getaddrinfo(host.c_str(), NULL, &hints, &ptr) != 0)
+ ptr = 0;
+ }
+ ~AddrInfo() { if (ptr) ::freeaddrinfo(ptr); }
+};
+}
+
+bool SystemInfo::isLocalHost(const std::string& host) {
+ std::vector<Address> myAddrs;
+ getLocalIpAddresses(0, myAddrs);
+ std::set<string> localHosts;
+ for (std::vector<Address>::const_iterator i = myAddrs.begin(); i != myAddrs.end(); ++i)
+ localHosts.insert(i->host);
+ // Resolve host
+ AddrInfo ai(host);
+ if (!ai.ptr) return false;
+ for (struct addrinfo *res = ai.ptr; res != NULL; res = res->ai_next) {
+ if (isLoopback(res->ai_addr)) return true;
+ // Get string form of IP addr
+ char addr[NI_MAXHOST] = "";
+ int error = ::getnameinfo(res->ai_addr, res->ai_addrlen, addr, NI_MAXHOST, NULL, 0,
+ NI_NUMERICHOST | NI_NUMERICSERV);
+ if (error) return false;
+ if (localHosts.find(addr) != localHosts.end()) return true;
}
+ return false;
}
void SystemInfo::getSystemId (std::string &osName,
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.cpp Fri Aug 3 12:13:32 2012
@@ -19,9 +19,9 @@
*
*/
#include "qpid/sys/ssl/SslHandler.h"
-
#include "qpid/sys/ssl/SslIo.h"
#include "qpid/sys/ssl/SslSocket.h"
+#include "qpid/sys/Timer.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/log/Statement.h"
@@ -42,6 +42,24 @@ struct Buff : public SslIO::BufferBase {
{ delete [] bytes;}
};
+struct ProtocolTimeoutTask : public sys::TimerTask {
+ SslHandler& handler;
+ std::string id;
+
+ ProtocolTimeoutTask(const std::string& i, const Duration& timeout, SslHandler& h) :
+ TimerTask(timeout, "ProtocolTimeout"),
+ handler(h),
+ id(i)
+ {}
+
+ void fire() {
+ // If this fires it means that we didn't negotiate the connection in the timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection " << id << " No protocol received closing");
+ handler.abort();
+ }
+};
+
SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f, bool _nodict) :
identifier(id),
aio(0),
@@ -55,12 +73,18 @@ SslHandler::SslHandler(std::string id, C
SslHandler::~SslHandler() {
if (codec)
codec->closed();
+ if (timeoutTimerTask)
+ timeoutTimerTask->cancel();
delete codec;
}
-void SslHandler::init(SslIO* a, int numBuffs) {
+void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) {
aio = a;
+ // Start timer for this connection
+ timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this);
+ timer.add(timeoutTimerTask);
+
// Give connection some buffers to use
for (int i = 0; i < numBuffs; i++) {
aio->queueReadBuffer(new Buff);
@@ -80,8 +104,10 @@ void SslHandler::write(const framing::Pr
}
void SslHandler::abort() {
- // TODO: can't implement currently as underlying functionality not implemented
- // aio->requestCallback(boost::bind(&SslHandler::eof, this, _1));
+ // Don't disconnect if we're already disconnecting
+ if (!readError) {
+ aio->requestCallback(boost::bind(&SslHandler::eof, this, _1));
+ }
}
void SslHandler::activateOutput() {
aio->notifyPendingWrite();
@@ -109,6 +135,9 @@ void SslHandler::readbuff(SslIO& , SslIO
framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
+ // We've just got the protocol negotiation so we can cancel the timeout for that
+ timeoutTimerTask->cancel();
+
decoded = in.getPosition();
QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
try {
@@ -169,6 +198,10 @@ void SslHandler::idle(SslIO&){
if (isClient && codec == 0) {
codec = factory->create(*this, identifier, getSecuritySettings(aio));
write(framing::ProtocolInitiation(codec->getVersion()));
+ // We've just sent the protocol negotiation so we can cancel the timeout for that
+ // This is not ideal, because we've not received anything yet, but heartbeats will
+ // be active soon
+ timeoutTimerTask->cancel();
return;
}
if (codec == 0) return;
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.h Fri Aug 3 12:13:32 2012
@@ -25,6 +25,8 @@
#include "qpid/sys/ConnectionCodec.h"
#include "qpid/sys/OutputControl.h"
+#include <boost/intrusive_ptr.hpp>
+
namespace qpid {
namespace framing {
@@ -32,6 +34,10 @@ namespace framing {
}
namespace sys {
+
+class Timer;
+class TimerTask;
+
namespace ssl {
class SslIO;
@@ -46,6 +52,7 @@ class SslHandler : public OutputControl
bool readError;
bool isClient;
bool nodict;
+ boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask;
void write(const framing::ProtocolInitiation&);
qpid::sys::SecuritySettings getSecuritySettings(SslIO* aio);
@@ -53,7 +60,7 @@ class SslHandler : public OutputControl
public:
SslHandler(std::string id, ConnectionCodec::Factory* f, bool nodict);
~SslHandler();
- void init(SslIO* a, int numBuffs);
+ void init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs);
void setClient() { isClient = true; }
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.cpp Fri Aug 3 12:13:32 2012
@@ -257,6 +257,18 @@ void SslIO::queueWriteClose() {
DispatchHandle::rewatchWrite();
}
+void SslIO::requestCallback(RequestCallback callback) {
+ // TODO creating a function object every time isn't all that
+ // efficient - if this becomes heavily used do something better (what?)
+ assert(callback);
+ DispatchHandle::call(boost::bind(&SslIO::requestedCall, this, callback));
+}
+
+void SslIO::requestedCall(RequestCallback callback) {
+ assert(callback);
+ callback(*this);
+}
+
/** Return a queued buffer if there are enough
* to spare
*/
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.h Fri Aug 3 12:13:32 2012
@@ -125,6 +125,7 @@ public:
typedef boost::function2<void, SslIO&, const SslSocket&> ClosedCallback;
typedef boost::function1<void, SslIO&> BuffersEmptyCallback;
typedef boost::function1<void, SslIO&> IdleCallback;
+ typedef boost::function1<void, SslIO&> RequestCallback;
private:
@@ -159,6 +160,7 @@ public:
void notifyPendingWrite();
void queueWriteClose();
bool writeQueueEmpty() { return writeQueue.empty(); }
+ void requestCallback(RequestCallback);
BufferBase* getQueuedBuffer();
qpid::sys::SecuritySettings getSecuritySettings();
@@ -168,6 +170,7 @@ private:
void readable(qpid::sys::DispatchHandle& handle);
void writeable(qpid::sys::DispatchHandle& handle);
void disconnected(qpid::sys::DispatchHandle& handle);
+ void requestedCall(RequestCallback);
void close(qpid::sys::DispatchHandle& handle);
};
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/unordered_map.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/unordered_map.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/unordered_map.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/unordered_map.h Fri Aug 3 12:13:32 2012
@@ -23,6 +23,8 @@
#ifdef _MSC_VER
# include <unordered_map>
+#elif defined(__SUNPRO_CC)
+# include <boost/tr1/unordered_map.hpp>
#else
# include <tr1/unordered_map>
#endif /* _MSC_VER */
Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/windows/IocpPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/windows/IocpPoller.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/windows/IocpPoller.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/windows/IocpPoller.cpp Fri Aug 3 12:13:32 2012
@@ -96,6 +96,7 @@ void Poller::shutdown() {
// Allow sloppy code to shut us down more than once.
if (impl->isShutdown)
return;
+ impl->isShutdown = true;
ULONG_PTR key = 1; // Tell wait() it's a shutdown, not I/O
PostQueuedCompletionStatus(impl->iocp, 0, key, 0);
}
@@ -110,7 +111,7 @@ bool Poller::interrupt(PollerHandle&) {
}
void Poller::run() {
- do {
+ while (!impl->isShutdown) {
Poller::Event event = this->wait();
// Handle shutdown
@@ -124,7 +125,7 @@ void Poller::run() {
// This should be impossible
assert(false);
}
- } while (true);
+ }
}
void Poller::monitorHandle(PollerHandle& handle, Direction dir) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org