You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC

svn commit: r1368910 [8/27] - in /qpid/branches/asyncstore: ./ bin/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/ cpp/bindings/qpid/ruby/features/step_definitions/ cpp/...

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Aug  3 12:13:32 2012
@@ -19,13 +19,18 @@
  *
  */
 
+#include "QueueGuard.h"
+#include "QueueRange.h"
+#include "QueueReplicator.h"
 #include "ReplicatingSubscription.h"
+#include "Primary.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/ConnectionState.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Statement.h"
+#include "qpid/types/Uuid.h"
 #include <sstream>
 
 namespace qpid {
@@ -34,19 +39,90 @@ namespace ha {
 using namespace framing;
 using namespace broker;
 using namespace std;
+using sys::Mutex;
 
-const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
+const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription");
+const string ReplicatingSubscription::QPID_BACK("qpid.ha-back");
+const string ReplicatingSubscription::QPID_FRONT("qpid.ha-front");
+const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info");
 
 namespace {
 const string DOLLAR("$");
 const string INTERNAL("-internal");
 } // namespace
 
+// Scan the queue for gaps and add them to the subscriptions dequed set.
+class DequeueScanner
+{
+  public:
+    DequeueScanner(
+        ReplicatingSubscription& rs,
+        SequenceNumber front_,
+        SequenceNumber back_    // Inclusive
+    ) : subscription(rs), front(front_), back(back_)
+    {
+        assert(front <= back);
+        // INVARIANT deques have been added for positions <= at.
+        at = front - 1;
+    }
+
+    void operator()(const QueuedMessage& qm) {
+        if (qm.position >= front && qm.position <= back) {
+            if (qm.position > at+1) subscription.dequeued(at+1, qm.position-1);
+            at = qm.position;
+        }
+    }
+
+    // Must call after scanning the queue.
+    void finish() {
+        if (at < back) subscription.dequeued(at+1, back);
+    }
+
+  private:
+    ReplicatingSubscription& subscription;
+    SequenceNumber front;
+    SequenceNumber back;
+    SequenceNumber at;
+};
+
 string mask(const string& in)
 {
     return DOLLAR + in + INTERNAL;
 }
 
+
+/** Dummy consumer used to get the front position on the queue */
+class GetPositionConsumer : public Consumer
+{
+  public:
+    GetPositionConsumer() :
+        Consumer("ha.GetPositionConsumer."+types::Uuid(true).str(), false) {}
+    bool deliver(broker::QueuedMessage& ) { return true; }
+    void notify() {}
+    bool filter(boost::intrusive_ptr<broker::Message>) { return true; }
+    bool accept(boost::intrusive_ptr<broker::Message>) { return true; }
+    void cancel() {}
+    void acknowledged(const broker::QueuedMessage&) {}
+    bool browseAcquired() const { return true; }
+    broker::OwnershipToken* getSession() { return 0; }
+};
+
+
+bool ReplicatingSubscription::getNext(
+    broker::Queue& q, SequenceNumber from, SequenceNumber& result)
+{
+    boost::shared_ptr<Consumer> c(new GetPositionConsumer);
+    c->setPosition(from);
+    if (!q.dispatch(c)) return false;
+    result = c->getPosition();
+    return true;
+}
+
+bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) {
+    // FIXME aconway 2012-05-23: won't wrap, assumes 0 is < all messages in queue.
+    return getNext(q, 0, front);
+}
+
 /* Called by SemanticState::consume to create a consumer */
 boost::shared_ptr<broker::SemanticState::ConsumerImpl>
 ReplicatingSubscription::Factory::create(
@@ -66,7 +142,7 @@ ReplicatingSubscription::Factory::create
         rs.reset(new ReplicatingSubscription(
                      parent, name, queue, ack, acquire, exclusive, tag,
                      resumeId, resumeTtl, arguments));
-        queue->addObserver(rs);
+        rs->initialize();
     }
     return rs;
 }
@@ -84,179 +160,223 @@ ReplicatingSubscription::ReplicatingSubs
     const framing::FieldTable& arguments
 ) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
                  resumeId, resumeTtl, arguments),
-    events(new Queue(mask(name))),
-    consumer(new DelegatingConsumer(*this))
+    dummy(new Queue(mask(name))),
+    ready(false)
 {
-    // Separate the remote part from a "local-remote" address.
-    string address = parent->getSession().getConnection().getUrl();
-    size_t i = address.find('-');
-    if (i != string::npos) address = address.substr(i+1);
-    logPrefix = "HA: Primary ";
-    stringstream ss;
-    logSuffix = " (" + address + ")";
-
-    // FIXME aconway 2011-12-09: Failover optimization removed.
-    // There was code here to re-use messages already on the backup
-    // during fail-over. This optimization was removed to simplify
-    // the logic till we get the basic replication stable, it
-    // can be re-introduced later. Last revision with the optimization:
-    // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
-
-    QPID_LOG(debug, logPrefix << "created backup subscription " << getName() << logSuffix);
-
-    // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
-    // so we will start consuming from the lowest numbered message.
-    // This is incorrect if the sequence number wraps around, but
-    // this is what all consumers currently do.
-}
-
-// Message is delivered in the subscription's connection thread.
-bool ReplicatingSubscription::deliver(QueuedMessage& m) {
     try {
-        // Add position events for the subscribed queue, not for the internal event queue.
-        if (m.queue && m.queue == getQueue().get()) {
-            sys::Mutex::ScopedLock l(lock);
-            if (position != m.position)
-                throw Exception(
-                    QPID_MSG("Expected position " << position
-                             << " but got " << m.position));
-            // m.position is the position of the newly enqueued m on the local queue.
-            // backupPosition is latest position on the backup queue (before enqueueing m.)
-            if (m.position <= backupPosition)
-                throw Exception(
-                    QPID_MSG("Expected position >  " << backupPosition
-                             << " but got " << m.position));
-
-            if (m.position - backupPosition > 1) {
-                // Position has advanced because of messages dequeued ahead of us.
-                SequenceNumber send(m.position);
-                --send;   // Send the position before m was enqueued.
-                sendPositionEvent(send, l);
+        FieldTable ft;
+        if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
+            throw Exception("Replicating subscription does not have broker info: " + tag);
+        info.assign(ft);
+
+        // Set a log prefix message that identifies the remote broker.
+        ostringstream os;
+        os << "Primary " << queue->getName() << "@" << info.getLogId() << ": ";
+        logPrefix = os.str();
+
+        // NOTE: Once the guard is attached we can have concurrent
+        // calls to dequeued so we need to lock use of this->dequeues.
+        //
+        // However we must attach the guard _before_ we scan for
+        // initial dequeues to be sure we don't miss any dequeues
+        // between the scan and attaching the guard.
+        if (Primary::get()) guard = Primary::get()->getGuard(queue, info);
+        if (!guard) guard.reset(new QueueGuard(*queue, info));
+        guard->attach(*this);
+
+        QueueRange backup(arguments); // Remote backup range.
+        QueueRange primary(guard->getRange()); // Unguarded range when the guard was set.
+        backupPosition = backup.back;
+
+        // Sync backup and primary queues, don't send messages already on the backup
+
+        if (backup.front > primary.front || // Missing messages at front
+            backup.back < primary.front ||  // No overlap
+            primary.empty() || backup.empty()) // Empty
+        {
+            // No useful overlap - erase backup and start from the beginning
+            if (!backup.empty()) dequeued(backup.front, backup.back);
+            position = primary.front-1;
+        }
+        else {  // backup and primary do overlap.
+            // Remove messages from backup that are not in primary.
+            if (primary.back < backup.back) {
+                dequeued(primary.back+1, backup.back); // Trim excess messages at back
+                backup.back = primary.back;
+            }
+            if (backup.front < primary.front) {
+                dequeued(backup.front, primary.front-1); // Trim excess messages at front
+                backup.front = primary.front;
             }
-            backupPosition = m.position;
-            QPID_LOG(trace, logPrefix << "replicating " << m << logSuffix);
+            DequeueScanner scan(*this, backup.front, backup.back);
+            // FIXME aconway 2012-06-15: Optimize queue traversal, only in range.
+            queue->eachMessage(boost::ref(scan)); // Remove missing messages in between.
+            scan.finish();
+            position = backup.back;
         }
-        return ConsumerImpl::deliver(m);
-    } catch (const std::exception& e) {
-        QPID_LOG(critical, logPrefix << "error replicating " << getQueue()->getName()
-                 << logSuffix << ": " << e.what());
+        // NOTE: we are assuming that the messages that are on the backup are
+        // consistent with those on the primary. If the backup is a replica
+        // queue and hasn't been tampered with then that will be the case.
+
+        QPID_LOG(debug, logPrefix << "Subscribed:"
+                 << " backup:" << backup
+                 << " primary:" << primary
+                 << " catch-up: " << position << "-" << primary.back
+                 << "(" << primary.back-position << ")");
+
+        // Check if we are ready yet.
+        if (guard->subscriptionStart(position)) setReady();
+    }
+    catch (const std::exception& e) {
+        QPID_LOG(error, logPrefix << "Creation error: " << e.what()
+                 << ": arguments=" << getArguments());
         throw;
     }
 }
 
-ReplicatingSubscription::~ReplicatingSubscription() {}
-
+ReplicatingSubscription::~ReplicatingSubscription() {
+    QPID_LOG(debug, logPrefix << "Detroyed replicating subscription");
+}
 
-// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg
+// Called in subscription's connection thread when the subscription is created.
+// Called separate from ctor because sending events requires
+// shared_from_this
+//
+void ReplicatingSubscription::initialize() {
+    try {
+        Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
 
-// Mark a message completed. May be called by acknowledge or dequeued
-void ReplicatingSubscription::complete(
-    const QueuedMessage& qm, const sys::Mutex::ScopedLock&)
-{
-    // Handle completions for the subscribed queue, not the internal event queue.
-    if (qm.queue && qm.queue == getQueue().get()) {
-        QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix);
-        Delayed::iterator i= delayed.find(qm.position);
-        // The same message can be completed twice, by acknowledged and
-        // dequeued, remove it from the set so it only gets completed
-        // once.
-        if (i != delayed.end()) {
-            assert(i->second.payload == qm.payload);
-            qm.payload->getIngressCompletion().finishCompleter();
-            delayed.erase(i);
-        }
+        // Send initial dequeues and position to the backup.
+        // There must be a shared_ptr(this) when sending.
+        sendDequeueEvent(l);
+        sendPositionEvent(position, l);
+        backupPosition = position;
+    }
+    catch (const std::exception& e) {
+        QPID_LOG(error, logPrefix << "Initialization error: " << e.what()
+                 << ": arguments=" << getArguments());
+        throw;
     }
 }
 
-// Called before we get notified of the message being available and
-// under the message lock in the queue. Called in arbitrary connection thread.
-void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
-    sys::Mutex::ScopedLock l(lock);
-    // Delay completion
-    QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix);
-    qm.payload->getIngressCompletion().startCompleter();
-    assert(delayed.find(qm.position) == delayed.end());
-    delayed[qm.position] = qm;
+// Message is delivered in the subscription's connection thread.
+bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
+    try {
+        // Add position events for the subscribed queue, not the internal event queue.
+        if (qm.queue == getQueue().get()) {
+            QPID_LOG(trace, logPrefix << "Replicating " << qm);
+            {
+                Mutex::ScopedLock l(lock);
+                assert(position == qm.position);
+                // qm.position is the position of the newly enqueued qm on local queue.
+                // backupPosition is latest position on backup queue before enqueueing
+                if (qm.position <= backupPosition)
+                    throw Exception(
+                        QPID_MSG("Expected position >  " << backupPosition
+                                 << " but got " << qm.position));
+                if (qm.position - backupPosition > 1) {
+                    // Position has advanced because of messages dequeued ahead of us.
+                    // Send the position before qm was enqueued.
+                    sendPositionEvent(qm.position-1, l);
+                }
+                // Backup will automatically advance by 1 on delivery of message.
+                backupPosition = qm.position;
+            }
+        }
+        return ConsumerImpl::deliver(qm);
+    } catch (const std::exception& e) {
+        QPID_LOG(critical, logPrefix << "Error replicating " << qm
+                 << ": " << e.what());
+        throw;
+    }
 }
 
-
-// Function to complete a delayed message, called by cancel()
-void ReplicatingSubscription::cancelComplete(
-    const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
-{
-    QPID_LOG(trace, logPrefix << "cancel completed " << v.second << logSuffix);
-    v.second.payload->getIngressCompletion().finishCompleter();
+void ReplicatingSubscription::setReady() {
+    {
+        Mutex::ScopedLock l(lock);
+        if (ready) return;
+        ready = true;
+    }
+    // Notify Primary that a subscription is ready.
+    QPID_LOG(debug, logPrefix << "Caught up");
+    if (Primary::get()) Primary::get()->readyReplica(*this);
 }
 
 // Called in the subscription's connection thread.
 void ReplicatingSubscription::cancel()
 {
-    getQueue()->removeObserver(
-        boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
-    {
-        sys::Mutex::ScopedLock l(lock);
-        QPID_LOG(debug, logPrefix << "cancel backup subscription " << getName() << logSuffix);
-        for_each(delayed.begin(), delayed.end(),
-                 boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
-        delayed.clear();
-    }
+    guard->cancel();
     ConsumerImpl::cancel();
 }
 
-// Called on primary in the backups IO thread.
-void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
-    sys::Mutex::ScopedLock l(lock);
-    // Finish completion of message, it has been acknowledged by the backup.
-    complete(msg, l);
+// Consumer override, called on primary in the backup's IO thread.
+void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) {
+    if (qm.queue == getQueue().get()) { // Don't complete messages on the internal queue
+        // Finish completion of message, it has been acknowledged by the backup.
+        QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
+        guard->complete(qm);
+        // If next message is protected by the guard then we are ready
+        if (qm.position >= guard->getRange().back) setReady();
+    }
+    ConsumerImpl::acknowledged(qm);
 }
 
-// Hide the "queue deleted" error for a ReplicatingSubscription when a
-// queue is deleted, this is normal and not an error.
-bool ReplicatingSubscription::hideDeletedError() { return true; }
-
 // Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
+void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
 {
-    QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues
-             << " from " << getQueue()->getName() << logSuffix);
+    if (dequeues.empty()) return;
+    QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
     string buf(dequeues.encodedSize(),'\0');
     framing::Buffer buffer(&buf[0], buf.size());
     dequeues.encode(buffer);
     dequeues.clear();
     buffer.reset();
-    sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
+    {
+        Mutex::ScopedUnlock u(lock);
+        sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer);
+    }
 }
 
-// Called after the message has been removed from the deque and under
-// the messageLock in the queue. Called in arbitrary connection threads.
+// Called via QueueObserver::dequeued override on guard.
+// Called after the message has been removed
+// from the deque and under the messageLock in the queue. Called in
+// arbitrary connection threads.
 void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
 {
+    assert (qm.queue == getQueue().get());
+    QPID_LOG(trace, logPrefix << "Dequeued " << qm);
     {
-        sys::Mutex::ScopedLock l(lock);
-        QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix);
+        Mutex::ScopedLock l(lock);
         dequeues.add(qm.position);
-        // If we have not yet sent this message to the backup, then
-        // complete it now as it will never be accepted.
-        if (qm.position > position) complete(qm, l);
     }
     notify();                   // Ensure a call to doDispatch
 }
 
+// Called during construction while scanning for initial dequeues.
+void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last) {
+    QPID_LOG(trace, logPrefix << "Initial dequeue [" << first << ", " << last << "]");
+    {
+        Mutex::ScopedLock l(lock);
+        dequeues.add(first,last);
+    }
+}
+
 // Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendPositionEvent(
-    SequenceNumber position, const sys::Mutex::ScopedLock&l )
+void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&)
 {
-    QPID_LOG(trace, logPrefix << "sending position " << position
-             << ", was " << backupPosition << logSuffix);
-    string buf(backupPosition.encodedSize(),'\0');
+    if (pos == backupPosition) return; // No need to send.
+    QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition);
+    string buf(pos.encodedSize(),'\0');
     framing::Buffer buffer(&buf[0], buf.size());
-    position.encode(buffer);
+    pos.encode(buffer);
     buffer.reset();
-    sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l);
+    {
+        Mutex::ScopedUnlock u(lock);
+        sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer);
+    }
 }
 
-void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer,
-                                        const sys::Mutex::ScopedLock&)
+void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer)
 {
     //generate event message
     boost::intrusive_ptr<Message> event = new Message();
@@ -276,15 +396,14 @@ void ReplicatingSubscription::sendEvent(
     event->getFrames().append(header);
     event->getFrames().append(content);
 
-    DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
+    DeliveryProperties* props =
+        event->getFrames().getHeaders()->get<DeliveryProperties>(true);
     props->setRoutingKey(key);
-    // Send the event using the events queue. Consumer is a
-    // DelegatingConsumer that delegates to *this for everything but
-    // has an independnet position. We put an event on events and
-    // dispatch it through ourselves to send it in line with the
-    // normal browsing messages.
-    events->deliver(event);
-    events->dispatch(consumer);
+    // Send the event directly to the base consumer implementation.
+    // We don't really need a queue here but we pass a dummy queue
+    // to conform to the consumer API.
+    QueuedMessage qm(dummy.get(), event);
+    ConsumerImpl::deliver(qm);
 }
 
 
@@ -292,19 +411,10 @@ void ReplicatingSubscription::sendEvent(
 bool ReplicatingSubscription::doDispatch()
 {
     {
-        sys::Mutex::ScopedLock l(lock);
+        Mutex::ScopedLock l(lock);
         if (!dequeues.empty()) sendDequeueEvent(l);
     }
     return ConsumerImpl::doDispatch();
 }
 
-ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
-ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
-bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { return delegate.deliver(m); }
-void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
-bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
-bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
-bool ReplicatingSubscription::DelegatingConsumer::browseAcquired() const { return delegate.browseAcquired(); }
-OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
-
 }} // namespace qpid::ha

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Aug  3 12:13:32 2012
@@ -22,10 +22,10 @@
  *
  */
 
-#include "QueueReplicator.h"    // For DEQUEUE_EVENT_KEY
+#include "BrokerInfo.h"
 #include "qpid/broker/SemanticState.h"
-#include "qpid/broker/QueueObserver.h"
 #include "qpid/broker/ConsumerFactory.h"
+#include "qpid/types/Uuid.h"
 #include <iosfwd>
 
 namespace qpid {
@@ -42,18 +42,27 @@ class Buffer;
 }
 
 namespace ha {
+class QueueGuard;
 
 /**
- * A susbcription that represents a backup replicating a queue.
+ * A susbcription that replicates to a remote backup.
  *
- * Runs on the primary. Delays completion of messages till the backup
- * has acknowledged, informs backup of locally dequeued messages.
+ * Runs on the primary. In conjunction with a QueueGuard, delays completion of
+ * messages till the backup has acknowledged, informs backup of locally dequeued
+ * messages.
+ *
+ * A ReplicatingSubscription is "ready" when all the messages on the queue have
+ * either been acknowledged by the backup, or are protected by the queue guard.
+ * On a primary broker the ReplicatingSubscription calls Primary::readyReplica
+ * when it is ready.
+ *
+ * THREAD SAFE: Called in subscription's connection thread but also in arbitrary
+ * connection threads via dequeued.
+ *
+ * Lifecycle: broker::Queue holds shared_ptrs to this as a consumer.
  *
- * THREAD SAFE: Used as a consumer in subscription's connection
- * thread, and as a QueueObserver in arbitrary connection threads.
  */
-class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
-                                public broker::QueueObserver
+class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
 {
   public:
     struct Factory : public broker::ConsumerFactory {
@@ -67,6 +76,20 @@ class ReplicatingSubscription : public b
 
     // Argument names for consume command.
     static const std::string QPID_REPLICATING_SUBSCRIPTION;
+    static const std::string QPID_BACK;
+    static const std::string QPID_FRONT;
+    static const std::string QPID_BROKER_INFO;
+
+    // TODO aconway 2012-05-23: these don't belong on ReplicatingSubscription
+    /** Get position of front message on queue.
+     *@return false if queue is empty.
+     */
+    static bool getFront(broker::Queue&, framing::SequenceNumber& result);
+    /** Get next message after from in queue.
+     *@return false if none found.
+     */
+    static bool getNext(broker::Queue&, framing::SequenceNumber from,
+                        framing::SequenceNumber& result);
 
     ReplicatingSubscription(broker::SemanticState* parent,
                             const std::string& name, boost::shared_ptr<broker::Queue> ,
@@ -76,56 +99,46 @@ class ReplicatingSubscription : public b
 
     ~ReplicatingSubscription();
 
-    // QueueObserver overrides.
-    bool deliver(broker::QueuedMessage& msg);
-    void enqueued(const broker::QueuedMessage&);
-    void dequeued(const broker::QueuedMessage&);
-    void acquired(const broker::QueuedMessage&) {}
-    void requeued(const broker::QueuedMessage&) {}
+    // Called via QueueGuard::dequeued.
+    //@return true if the message requires completion.
+    void dequeued(const broker::QueuedMessage& qm);
+
+    // Called during initial scan for dequeues.
+    void dequeued(framing::SequenceNumber first, framing::SequenceNumber last);
 
     // Consumer overrides.
+    bool deliver(broker::QueuedMessage& msg);
     void cancel();
     void acknowledged(const broker::QueuedMessage&);
     bool browseAcquired() const { return true; }
+    // Hide the "queue deleted" error for a ReplicatingSubscription when a
+    // queue is deleted, this is normal and not an error.
+    bool hideDeletedError() { return true; }
+
+    /** Initialization that must be done separately from construction
+     * because it requires a shared_ptr to this to exist.
+     */
+    void initialize();
 
-    bool hideDeletedError();
+    BrokerInfo getBrokerInfo() const { return info; }
 
   protected:
     bool doDispatch();
+
   private:
-    typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed;
-    std::string logPrefix, logSuffix;
-    boost::shared_ptr<broker::Queue> events;
-    boost::shared_ptr<broker::Consumer> consumer;
-    Delayed delayed;
+    std::string logPrefix;
+    boost::shared_ptr<broker::Queue> dummy; // Used to send event messages
     framing::SequenceSet dequeues;
     framing::SequenceNumber backupPosition;
-
-    void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&);
-    void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&);
-    void sendDequeueEvent(const sys::Mutex::ScopedLock&);
-    void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
-    void sendEvent(const std::string& key, framing::Buffer&,
-                   const sys::Mutex::ScopedLock&);
-
-    class DelegatingConsumer : public Consumer
-    {
-      public:
-        DelegatingConsumer(ReplicatingSubscription&);
-        ~DelegatingConsumer();
-        bool deliver(broker::QueuedMessage& msg);
-        void notify();
-        bool filter(boost::intrusive_ptr<broker::Message>);
-        bool accept(boost::intrusive_ptr<broker::Message>);
-        void cancel() {}
-        void acknowledged(const broker::QueuedMessage&) {}
-        bool browseAcquired() const;
-
-        broker::OwnershipToken* getSession();
-
-      private:
-        ReplicatingSubscription& delegate;
-    };
+    bool ready;
+    BrokerInfo info;
+    boost::shared_ptr<QueueGuard> guard;
+
+    void sendDequeueEvent(sys::Mutex::ScopedLock&);
+    void sendPositionEvent(framing::SequenceNumber, sys::Mutex::ScopedLock&);
+    void setReady();
+    void sendEvent(const std::string& key, framing::Buffer&);
+  friend struct Factory;
 };
 
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/Settings.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/Settings.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/Settings.h Fri Aug  3 12:13:32 2012
@@ -22,7 +22,7 @@
  *
  */
 
-#include "ReplicateLevel.h"
+#include "types.h"
 #include <string>
 
 namespace qpid {
@@ -34,13 +34,15 @@ namespace ha {
 class Settings
 {
   public:
-    Settings() : cluster(false), expectedBackups(0), replicateDefault(RL_NONE) {}
+    Settings() : cluster(false), replicateDefault(NONE), backupTimeout(5)
+    {}
+
     bool cluster;               // True if we are a cluster member.
     std::string clientUrl;
     std::string brokerUrl;
-    size_t expectedBackups;
-    ReplicateLevel replicateDefault;
+    Enum<ReplicateLevel> replicateDefault;
     std::string username, password, mechanism;
+    double backupTimeout;
   private:
 };
 }} // namespace qpid::ha

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/management-schema.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/management-schema.xml (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/management-schema.xml Fri Aug  3 12:13:32 2012
@@ -25,37 +25,39 @@
 
     <property name="status" type="sstr" desc="HA status: primary or backup"/>
 
-    <property name="brokers" type="sstr"
-	      desc="Multiple-address URL used by HA brokers to connect to each other."/>
+    <property name="brokersUrl" type="sstr"
+	      desc="URL with address of each broker in the cluster."/>
 
-    <property name="publicBrokers" type="sstr"
-	      desc="Multiple-address URL used by clients to connect to the HA brokers."/>
+    <property name="publicUrl" type="sstr"
+	      desc="URL advertized to clients to connect to the cluster."/>
 
-    <property name="expectedBackups" type="uint16"
-	      desc="Number of HA backup brokers expected."/>
+    <property name="replicateDefault" type="sstr"
+	      desc="Replication for queues/exchanges with no qpid.replicate argument"/>
 
-    <property
-	name="replicateDefault" type="sstr"
-	desc="Replicate value for queues/exchanges without a qpid.replicate argument"/>
+    <property name="members" type="list" desc="List of brokers in the cluster"/>
+
+    <property name="systemId" type="uuid" desc="Identifies the system."/>
 
     <method name="promote" desc="Promote a backup broker to primary."/>
 
-    <method name="setBrokers" desc="Set URL for HA brokers to connect to each other.">
+    <method name="setBrokersUrl" desc="URL listing each broker in the cluster.">
       <arg name="url" type="sstr" dir="I"/>
     </method>
 
-    <method name="setPublicBrokers" desc="Set URL for clients to connect to  HA brokers">
+    <method name="setPublicUrl" desc="URL advertized to clients.">
       <arg name="url" type="sstr" dir="I"/>
     </method>
 
-    <method name="setExpectedBackups" desc="Set number of backups expected">
-      <arg name="expectedBackups" type="uint16" dir="I"/>
-    </method>
-
     <method name="replicate" desc="Replicate individual queue from remote broker.">
       <arg name="broker" type="sstr" dir="I"/>
       <arg name="queue" type="sstr" dir="I"/>
     </method>
   </class>
 
+  <eventArguments>
+    <arg name="members" type="list" desc="List of broker information maps"/>
+  </eventArguments>
+
+  <event name="membersUpdate" sev="inform" args="members"/>
+
 </schema>

Modified: qpid/branches/asyncstore/cpp/src/qpid/log/Logger.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/log/Logger.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/log/Logger.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/log/Logger.cpp Fri Aug  3 12:13:32 2012
@@ -47,7 +47,7 @@ using namespace std;
 typedef sys::Mutex::ScopedLock ScopedLock;
 
 inline void Logger::enable_unlocked(Statement* s) {
-    s->enabled=selector.isEnabled(s->level, s->function);
+    s->enabled=selector.isEnabled(s->level, s->function, s->category);
 }
 
 Logger& Logger::instance() {
@@ -95,6 +95,8 @@ void Logger::log(const Statement& s, con
         else
             qpid::sys::outputFormattedNow(os);
     }
+    if (flags&CATEGORY)
+        os << "[" << CategoryTraits::name(s.category) << "] ";
     if (flags&LEVEL)
         os << LevelTraits::name(s.level) << " ";
     if (flags&THREAD)
@@ -144,7 +146,8 @@ int Logger::format(const Options& opts) 
         bitIf(opts.source, (FILE|LINE)) |
         bitIf(opts.function, FUNCTION) |
         bitIf(opts.thread, THREAD) |
-        bitIf(opts.hiresTs, HIRES);
+        bitIf(opts.hiresTs, HIRES) |
+        bitIf(opts.category, CATEGORY);
     format(flags);
     return flags;
 }

Modified: qpid/branches/asyncstore/cpp/src/qpid/log/Options.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/log/Options.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/log/Options.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/log/Options.cpp Fri Aug  3 12:13:32 2012
@@ -39,6 +39,7 @@ Options::Options(const std::string& argv
     source(false),
     function(false),
     hiresTs(false),
+    category(true),
     trace(false),
     sinkOptions (SinkOptions::create(argv0_))
 {
@@ -49,15 +50,23 @@ Options::Options(const std::string& argv
     for (int i = 1; i < LevelTraits::COUNT; ++i)
         levels << " " << LevelTraits::name(Level(i));
 
+    ostringstream categories;
+    categories << CategoryTraits::name(Category(0));
+    for (int i = 1; i < CategoryTraits::COUNT; ++i)
+        categories << " " << CategoryTraits::name(Category(i));
+
     addOptions()
         ("trace,t", optValue(trace), "Enables all logging" )
         ("log-enable", optValue(selectors, "RULE"),
-         ("Enables logging for selected levels and components. " 
+         ("Enables logging for selected levels and components. "
           "RULE is in the form 'LEVEL[+][:PATTERN]' "
-          "Levels are one of: \n\t "+levels.str()+"\n"
+          "LEVEL is one of: \n\t "+levels.str()+"\n"
+          "PATTERN is a function name or a catogory: \n\t "+categories.str()+"\n"
           "For example:\n"
           "\t'--log-enable warning+' "
           "logs all warning, error and critical messages.\n"
+          "\t'--log-enable trace+:Broker' "
+          "logs all category 'Broker' messages.\n"
           "\t'--log-enable debug:framing' "
           "logs debug messages from the framing namespace. "
           "This option can be used multiple times").c_str())
@@ -67,6 +76,7 @@ Options::Options(const std::string& argv
         ("log-thread", optValue(thread,"yes|no"), "Include thread ID in log messages")
         ("log-function", optValue(function,"yes|no"), "Include function signature in log messages")
         ("log-hires-timestamp", optValue(hiresTs,"yes|no"), "Use hi-resolution timestamps in log messages")
+        ("log-category", optValue(category,"yes|no"), "Include category in log messages")
         ("log-prefix", optValue(prefix,"STRING"), "Prefix to append to all log messages")
         ;
     add(*sinkOptions);
@@ -83,6 +93,7 @@ Options::Options(const Options &o) :
     source(o.source),
     function(o.function),
     hiresTs(o.hiresTs),
+    category(o.category),
     trace(o.trace),
     prefix(o.prefix),
     sinkOptions (SinkOptions::create(o.argv0))
@@ -101,11 +112,12 @@ Options& Options::operator=(const Option
         source = x.source;
         function = x.function;
         hiresTs = x.hiresTs;
+        category = x.category;
         trace = x.trace;
         prefix = x.prefix;
         *sinkOptions = *x.sinkOptions;
     }
     return *this;
 }
-        
+
 }} // namespace qpid::log

Modified: qpid/branches/asyncstore/cpp/src/qpid/log/Selector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/log/Selector.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/log/Selector.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/log/Selector.cpp Fri Aug  3 12:13:32 2012
@@ -37,18 +37,29 @@ void Selector::enable(const string& enab
         level=enableStr.substr(0,c);
         pattern=enableStr.substr(c+1);
     }
+    bool isCat = CategoryTraits::isCategory(pattern);
     if (!level.empty() && level[level.size()-1]=='+') {
         for (int i =  LevelTraits::level(level.substr(0,level.size()-1));
              i < LevelTraits::COUNT;
-             ++i)
-            enable(Level(i), pattern);
+             ++i) {
+            if (isCat) {
+                enable(Level(i), CategoryTraits::category(pattern));
+            } else {
+                enable(Level(i), pattern);
+            }
+        }
     }
     else {
-        enable(LevelTraits::level(level), pattern);
+        if (isCat) {
+            enable(LevelTraits::level(level), CategoryTraits::category(pattern));
+        } else {
+            enable(LevelTraits::level(level), pattern);
+        }
     }
 }
 
 Selector::Selector(const Options& opt){
+    reset();
     for_each(opt.selectors.begin(), opt.selectors.end(),
              boost::bind(&Selector::enable, this, _1));
 }
@@ -58,11 +69,17 @@ bool Selector::isEnabled(Level level, co
     for (std::vector<std::string>::iterator i=substrings[level].begin();
          i != substrings[level].end();
          ++i)
-    {
-        if (std::search(function, functionEnd, i->begin(), i->end()) != functionEnd)
-            return true;
-    }
+        {
+            if (std::search(function, functionEnd, i->begin(), i->end()) != functionEnd)
+                return true;
+        }
     return false;
 }
 
+bool Selector::isEnabled(Level level, const char* function, Category category) {
+    if (catFlags[level][category])
+        return true;
+    return isEnabled(level, function);
+}
+
 }} // namespace qpid::log

Modified: qpid/branches/asyncstore/cpp/src/qpid/log/Statement.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/log/Statement.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/log/Statement.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/log/Statement.cpp Fri Aug  3 12:13:32 2012
@@ -36,7 +36,7 @@ std::string quote(const std::string& str
     size_t n = std::count_if(str.begin(), str.end(), nonPrint);
     if (n==0) return str;
     std::string ret;
-    ret.reserve(str.size()+2*n); // Avoid extra allocations.
+    ret.reserve(str.size()+3*n); // Avoid extra allocations.
     for (std::string::const_iterator i = str.begin(); i != str.end(); ++i) {
         if (nonPrint(*i)) {
             ret.push_back('\\');
@@ -50,10 +50,42 @@ std::string quote(const std::string& str
 }
 }
 
+//
+// Instance of name hints
+//
+static CategoryFileNameHints filenameHints;
+
+
+Category CategoryFileNameHints::categoryOf(const char* const fName) {
+    for (std::list<std::pair<const char* const, Category> >::iterator
+           it  = filenameHints.hintList.begin();
+           it != filenameHints.hintList.end();
+         ++it) {
+        if (strstr(fName, (const char* const)it->first) != 0) {
+            return it->second;
+        }
+    }
+    return unspecified;
+}
+
+
+void Statement::categorize(Statement& s) {
+    // given a statement and it's category
+    // if the category is Unspecified then try to find a
+    // better category based on the path and file name.
+    if (s.category == log::unspecified) {
+        s.category = CategoryFileNameHints::categoryOf(s.file);
+    } else {
+        // already has a category so leave it alone
+    }
+}
+
+
 void Statement::log(const std::string& message) {
     Logger::instance().log(*this, quote(message));
 }
 
+
 Statement::Initializer::Initializer(Statement& s) : statement(s) {
     // QPID-3891
     // From the given BOOST_CURRENT_FUNCTION name extract only the
@@ -99,16 +131,22 @@ Statement::Initializer::Initializer(Stat
         // no function-name pointer to process
     }
 
+    Statement::categorize(s);
     Logger::instance().add(s);
 }
 
+
 namespace {
 const char* names[LevelTraits::COUNT] = {
     "trace", "debug", "info", "notice", "warning", "error", "critical"
 };
 
-} // namespace
+const char* catNames[CategoryTraits::COUNT] = {
+    "Security", "Broker", "Management", "Protocol", "System", "HA", "Messaging",
+    "Store", "Network", "Test", "Client", "Model", "Unspecified"
+};
 
+} // namespace
 Level LevelTraits::level(const char* name) {
     for (int i =0; i < LevelTraits::COUNT; ++i) {
         if (strcmp(names[i], name)==0)
@@ -121,4 +159,23 @@ const char* LevelTraits::name(Level l) {
     return names[l];
 }
 
+bool CategoryTraits::isCategory(const std::string& name) {
+    for (int i =0; i < CategoryTraits::COUNT; ++i) {
+        if (strcmp(catNames[i], name.c_str())==0)
+            return true;
+    }
+    return false;
+}
+
+Category CategoryTraits::category(const char* name) {
+    for (int i =0; i < CategoryTraits::COUNT; ++i) {
+        if (strcmp(catNames[i], name)==0)
+            return Category(i);
+    }
+    throw std::runtime_error(std::string("Invalid log category name: ")+name);
+}
+
+const char* CategoryTraits::name(Category c) {
+    return catNames[c];
+}
 }} // namespace qpid::log

Modified: qpid/branches/asyncstore/cpp/src/qpid/log/posix/SinkOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/log/posix/SinkOptions.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/log/posix/SinkOptions.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/log/posix/SinkOptions.cpp Fri Aug  3 12:13:32 2012
@@ -22,11 +22,14 @@
 #include "qpid/log/OstreamOutput.h"
 #include "qpid/memory.h"
 #include "qpid/Exception.h"
+
 #include <iostream>
 #include <map>
 #include <string>
 #include <syslog.h>
 
+#include <boost/lexical_cast.hpp>
+
 using std::string;
 using qpid::Exception;
 
@@ -90,7 +93,7 @@ public:
     string name(int value) const {
         ByValue::const_iterator i = byValue.find(value);
         if (i == byValue.end())
-            throw Exception("Not a valid syslog value: " + value);
+            throw Exception("Not a valid syslog value: " + boost::lexical_cast<string>(value));
         return i->second;
     }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/management/Buffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/Buffer.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/management/Buffer.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/management/Buffer.cpp Fri Aug  3 12:13:32 2012
@@ -29,12 +29,11 @@ namespace management {
 
 Buffer::Buffer(char* data, uint32_t size) : impl(new framing::Buffer(data, size)) {}
 Buffer::~Buffer() { delete impl; }
-void Buffer::record() { impl->record(); }
-void Buffer::restore(bool reRecord) { impl->restore(reRecord); }
 void Buffer::reset() { impl->reset(); }
 uint32_t Buffer::available() { return impl->available(); }
 uint32_t Buffer::getSize() { return impl->getSize(); }
 uint32_t Buffer::getPosition() { return impl->getPosition(); }
+void Buffer::setPosition(uint32_t p) { impl->setPosition(p); }
 char* Buffer::getPointer() { return impl->getPointer(); }
 void Buffer::putOctet(uint8_t i) { impl->putOctet(i); }
 void Buffer::putShort(uint16_t i) { impl->putShort(i); }

Modified: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp Fri Aug  3 12:13:32 2012
@@ -1344,18 +1344,19 @@ void ManagementAgent::handleMethodReques
             outBuffer.putLong        (Manageable::STATUS_PARAMETER_INVALID);
             outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID));
         }
-        else
+        else {
+            uint32_t pos = outBuffer.getPosition();
             try {
-                outBuffer.record();
                 sys::Mutex::ScopedUnlock u(userLock);
                 string outBuf;
                 iter->second->doMethod(methodName, inArgs, outBuf, userId);
                 outBuffer.putRawData(outBuf);
             } catch(exception& e) {
-                outBuffer.restore();
+                outBuffer.setPosition(pos);;
                 outBuffer.putLong(Manageable::STATUS_EXCEPTION);
                 outBuffer.putMediumString(e.what());
             }
+        }
     }
 
     outLen = MA_BUFFER_SIZE - outBuffer.available();
@@ -1662,11 +1663,11 @@ void ManagementAgent::handleSchemaRespon
     string         packageName;
     SchemaClassKey key;
 
-    inBuffer.record();
+    uint32_t pos = inBuffer.getPosition();
     inBuffer.getOctet();
     inBuffer.getShortString(packageName);
     key.decode(inBuffer);
-    inBuffer.restore();
+    inBuffer.setPosition(pos);;
 
     QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
 
@@ -2426,7 +2427,6 @@ size_t ManagementAgent::validateTableSch
     uint8_t  hash[16];
 
     try {
-        inBuffer.record();
         uint8_t kind = inBuffer.getOctet();
         if (kind != ManagementItem::CLASS_KIND_TABLE)
             return 0;
@@ -2468,7 +2468,7 @@ size_t ManagementAgent::validateTableSch
     }
 
     end = inBuffer.getPosition();
-    inBuffer.restore(); // restore original position
+    inBuffer.setPosition(start); // restore original position
     return end - start;
 }
 
@@ -2480,7 +2480,6 @@ size_t ManagementAgent::validateEventSch
     uint8_t  hash[16];
 
     try {
-        inBuffer.record();
         uint8_t kind = inBuffer.getOctet();
         if (kind != ManagementItem::CLASS_KIND_EVENT)
             return 0;
@@ -2507,7 +2506,7 @@ size_t ManagementAgent::validateEventSch
     }
 
     end = inBuffer.getPosition();
-    inBuffer.restore(); // restore original position
+    inBuffer.setPosition(start); // restore original position
     return end - start;
 }
 

Propchange: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:r1333988-1368650

Propchange: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:r1333988-1368650

Modified: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/management/ManagementDirectExchange.cpp Fri Aug  3 12:13:32 2012
@@ -28,7 +28,7 @@ using namespace qpid::broker;
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-ManagementDirectExchange::ManagementDirectExchange(const string& _name, Manageable* _parent, Broker* b) :
+ManagementDirectExchange::ManagementDirectExchange(const std::string& _name, Manageable* _parent, Broker* b) :
     Exchange (_name, _parent, b),
     DirectExchange(_name, _parent, b),
     managementAgent(0) {}
@@ -43,7 +43,7 @@ ManagementDirectExchange::ManagementDire
 void ManagementDirectExchange::route(Deliverable&      msg)
 {
     bool routeIt = true;
-    const string& routingKey = msg.getMessage().getRoutingKey();
+    const std::string& routingKey = msg.getMessage().getRoutingKey();
     const FieldTable* args = msg.getMessage().getApplicationHeaders();
 
     if (managementAgent)

Modified: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/management/ManagementTopicExchange.cpp Fri Aug  3 12:13:32 2012
@@ -27,7 +27,7 @@ using namespace qpid::broker;
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-ManagementTopicExchange::ManagementTopicExchange(const string& _name, Manageable* _parent, Broker* b) :
+ManagementTopicExchange::ManagementTopicExchange(const std::string& _name, Manageable* _parent, Broker* b) :
     Exchange (_name, _parent, b),
     TopicExchange(_name, _parent, b),
     managementAgent(0) {}
@@ -42,7 +42,7 @@ ManagementTopicExchange::ManagementTopic
 void ManagementTopicExchange::route(Deliverable&      msg)
 {
     bool routeIt = true;
-    const string& routingKey = msg.getMessage().getRoutingKey();
+    const std::string& routingKey = msg.getMessage().getRoutingKey();
     const FieldTable* args = msg.getMessage().getApplicationHeaders();
 
     // Intercept management agent commands
@@ -54,7 +54,7 @@ void ManagementTopicExchange::route(Deli
 }
 
 bool ManagementTopicExchange::bind(Queue::shared_ptr queue,
-                                   const string& routingKey,
+                                   const std::string& routingKey,
                                    const qpid::framing::FieldTable* args)
 {
     if (qmfVersion == 1)

Modified: qpid/branches/asyncstore/cpp/src/qpid/messaging/PrivateImplRef.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/messaging/PrivateImplRef.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/messaging/PrivateImplRef.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/messaging/PrivateImplRef.h Fri Aug  3 12:13:32 2012
@@ -77,15 +77,15 @@ template <class T> class PrivateImplRef 
     /** Set the implementation pointer in a handle */
     static void set(T& t, const intrusive_ptr& p) {
         if (t.impl == p) return;
-        if (t.impl) boost::intrusive_ptr_release(t.impl);
+        if (t.impl) intrusive_ptr_release(t.impl);
         t.impl = p.get();
-        if (t.impl) boost::intrusive_ptr_add_ref(t.impl);
+        if (t.impl) intrusive_ptr_add_ref(t.impl);
     }
 
     // Helper functions to implement the ctor, dtor, copy, assign
-    static void ctor(T& t, Impl* p) { t.impl = p; if (p) boost::intrusive_ptr_add_ref(p); }
+    static void ctor(T& t, Impl* p) { t.impl = p; if (p) intrusive_ptr_add_ref(p); }
     static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); }
-    static void dtor(T& t) { if(t.impl) boost::intrusive_ptr_release(t.impl); }
+    static void dtor(T& t) { if(t.impl) intrusive_ptr_release(t.impl); }
     static T& assign(T& t, const T& x) { set(t, get(x)); return t;}
 };
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/replication/ReplicationExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/replication/ReplicationExchange.cpp Fri Aug  3 12:13:32 2012
@@ -184,7 +184,7 @@ bool ReplicationExchange::unbind(Queue::
     throw NotImplementedException("Replication exchange does not support unbind operation");
 }
 
-bool ReplicationExchange::isBound(Queue::shared_ptr /*queue*/, const string* const /*routingKey*/, const FieldTable* const /*args*/)
+bool ReplicationExchange::isBound(Queue::shared_ptr /*queue*/, const std::string* const /*routingKey*/, const FieldTable* const /*args*/)
 {
     return false;
 }

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.cpp Fri Aug  3 12:13:32 2012
@@ -23,6 +23,7 @@
 #include "qpid/sys/AsynchIO.h"
 #include "qpid/sys/Socket.h"
 #include "qpid/sys/SecuritySettings.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/framing/AMQP_HighestVersion.h"
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/log/Statement.h"
@@ -41,11 +42,30 @@ struct Buff : public AsynchIO::BufferBas
     { delete [] bytes;}
 };
 
-AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
+struct ProtocolTimeoutTask : public sys::TimerTask {
+    AsynchIOHandler& handler;
+    std::string id;
+
+    ProtocolTimeoutTask(const std::string& i, const Duration& timeout, AsynchIOHandler& h) :
+        TimerTask(timeout, "ProtocolTimeout"),
+        handler(h),
+        id(i)
+    {}
+
+    void fire() {
+        // If this fires it means that we didn't negotiate the connection in the timeout period
+        // Schedule closing the connection for the io thread
+        QPID_LOG(error, "Connection " << id << " No protocol received closing");
+        handler.abort();
+    }
+};
+
+AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f) :
     identifier(id),
     aio(0),
     factory(f),
     codec(0),
+    reads(0),
     readError(false),
     isClient(false),
     readCredit(InfiniteCredit)
@@ -54,12 +74,18 @@ AsynchIOHandler::AsynchIOHandler(std::st
 AsynchIOHandler::~AsynchIOHandler() {
     if (codec)
         codec->closed();
+    if (timeoutTimerTask)
+        timeoutTimerTask->cancel();
     delete codec;
 }
 
-void AsynchIOHandler::init(AsynchIO* a, int numBuffs) {
+void AsynchIOHandler::init(qpid::sys::AsynchIO* a, qpid::sys::Timer& timer, uint32_t maxTime, int numBuffs) {
     aio = a;
 
+    // Start timer for this connection
+    timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this);
+    timer.add(timeoutTimerTask);
+
     // Give connection some buffers to use
     for (int i = 0; i < numBuffs; i++) {
         aio->queueReadBuffer(new Buff);
@@ -129,10 +155,18 @@ void AsynchIOHandler::readbuff(AsynchIO&
         }
     }
 
+    ++reads;
     size_t decoded = 0;
     if (codec) {                // Already initiated
         try {
             decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
+            // When we've decoded 3 reads (probably frames) we will have authenticated and
+            // started heartbeats, if specified, in many (but not all) cases so now we will cancel
+            // the idle connection timeout - this is really hacky, and would be better implemented
+            // in the connection, but that isn't actually created until the first decode.
+            if (reads == 3) {
+                timeoutTimerTask->cancel();
+            }
         }catch(const std::exception& e){
             QPID_LOG(error, e.what());
             readError = true;
@@ -143,6 +177,7 @@ void AsynchIOHandler::readbuff(AsynchIO&
         framing::ProtocolInitiation protocolInit;
         if (protocolInit.decode(in)) {
             decoded = in.getPosition();
+
             QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
             try {
                 codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
@@ -202,6 +237,10 @@ void AsynchIOHandler::idle(AsynchIO&){
     if (isClient && codec == 0) {
         codec = factory->create(*this, identifier, SecuritySettings());
         write(framing::ProtocolInitiation(codec->getVersion()));
+        // We've just sent the protocol negotiation so we can cancel the timeout for that
+        // This is not ideal, because we've not received anything yet, but heartbeats will
+        // be active soon
+        timeoutTimerTask->cancel();
         return;
     }
     if (codec == 0) return;

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/AsynchIOHandler.h Fri Aug  3 12:13:32 2012
@@ -27,6 +27,8 @@
 #include "qpid/sys/Mutex.h"
 #include "qpid/CommonImportExport.h"
 
+#include <boost/intrusive_ptr.hpp>
+
 namespace qpid {
 
 namespace framing {
@@ -38,24 +40,28 @@ namespace sys {
 class AsynchIO;
 struct AsynchIOBufferBase;
 class Socket;
+class Timer;
+class TimerTask;
 
 class AsynchIOHandler : public OutputControl {
     std::string identifier;
     AsynchIO* aio;
     ConnectionCodec::Factory* factory;
     ConnectionCodec* codec;
+    uint32_t reads;
     bool readError;
     bool isClient;
     AtomicValue<int32_t> readCredit;
     static const int32_t InfiniteCredit = -1;
     Mutex creditLock;
+    boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask;
 
     void write(const framing::ProtocolInitiation&);
 
   public:
-    QPID_COMMON_EXTERN AsynchIOHandler(std::string id, ConnectionCodec::Factory* f);
+    QPID_COMMON_EXTERN AsynchIOHandler(const std::string& id, qpid::sys::ConnectionCodec::Factory* f );
     QPID_COMMON_EXTERN ~AsynchIOHandler();
-    QPID_COMMON_EXTERN void init(AsynchIO* a, int numBuffs);
+    QPID_COMMON_EXTERN void init(AsynchIO* a, Timer& timer, uint32_t maxTime, int numBuffs);
 
     QPID_COMMON_INLINE_EXTERN void setClient() { isClient = true; }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/SslPlugin.cpp Fri Aug  3 12:13:32 2012
@@ -39,6 +39,8 @@
 namespace qpid {
 namespace sys {
 
+class Timer;
+
 using namespace qpid::sys::ssl;
 
 struct SslServerOptions : ssl::SslOptions
@@ -68,6 +70,8 @@ class SslProtocolFactoryTmpl : public Pr
 
     typedef SslAcceptorTmpl<T> SslAcceptor;
 
+    Timer& brokerTimer;
+    uint32_t maxNegotiateTime;
     const bool tcpNoDelay;
     T listener;
     const uint16_t listeningPort;
@@ -75,7 +79,7 @@ class SslProtocolFactoryTmpl : public Pr
     bool nodict;
 
   public:
-    SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay);
+    SslProtocolFactoryTmpl(const SslServerOptions&, int backlog, bool nodelay, Timer& timer, uint32_t maxTime);
     void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
     void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
                  ConnectionCodec::Factory*,
@@ -132,16 +136,18 @@ static struct SslPlugin : public Plugin 
                 try {
                     ssl::initNSS(options, true);
                     nssInitialized = true;
-                    
+
                     const broker::Broker::Options& opts = broker->getOptions();
 
                     ProtocolFactory::shared_ptr protocol(options.multiplex ?
                         static_cast<ProtocolFactory*>(new SslMuxProtocolFactory(options,
                                                   opts.connectionBacklog,
-                                                  opts.tcpNoDelay)) :
+                                                  opts.tcpNoDelay,
+                                                  broker->getTimer(), opts.maxNegotiateTime)) :
                         static_cast<ProtocolFactory*>(new SslProtocolFactory(options,
                                                opts.connectionBacklog,
-                                               opts.tcpNoDelay)));
+                                               opts.tcpNoDelay,
+                                               broker->getTimer(), opts.maxNegotiateTime)));
                     QPID_LOG(notice, "Listening for " <<
                                      (options.multiplex ? "SSL or TCP" : "SSL") <<
                                      " connections on TCP port " <<
@@ -156,14 +162,16 @@ static struct SslPlugin : public Plugin 
 } sslPlugin;
 
 template <class T>
-SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay) :
+SslProtocolFactoryTmpl<T>::SslProtocolFactoryTmpl(const SslServerOptions& options, int backlog, bool nodelay, Timer& timer, uint32_t maxTime) :
+    brokerTimer(timer),
+    maxNegotiateTime(maxTime),
     tcpNoDelay(nodelay), listeningPort(listener.listen(options.port, backlog, options.certName, options.clientAuth)),
     nodict(options.nodict)
 {}
 
 void SslEstablished(Poller::shared_ptr poller, const qpid::sys::SslSocket& s,
                     ConnectionCodec::Factory* f, bool isClient,
-                    bool tcpNoDelay, bool nodict) {
+                    Timer& timer, uint32_t maxTime, bool tcpNoDelay, bool nodict) {
     qpid::sys::ssl::SslHandler* async = new qpid::sys::ssl::SslHandler(s.getFullAddress(), f, nodict);
 
     if (tcpNoDelay) {
@@ -183,7 +191,7 @@ void SslEstablished(Poller::shared_ptr p
                                  boost::bind(&qpid::sys::ssl::SslHandler::nobuffs, async, _1),
                                  boost::bind(&qpid::sys::ssl::SslHandler::idle, async, _1));
 
-    async->init(aio, 4);
+    async->init(aio,timer, maxTime, 4);
     aio->start(poller);
 }
 
@@ -192,7 +200,7 @@ void SslProtocolFactory::established(Pol
                                      ConnectionCodec::Factory* f, bool isClient) {
     const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
 
-    SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict);
+    SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict);
 }
 
 template <class T>
@@ -216,7 +224,7 @@ void SslMuxProtocolFactory::established(
     const SslSocket *sslSock = dynamic_cast<const SslSocket*>(&s);
 
     if (sslSock) {
-        SslEstablished(poller, *sslSock, f, isClient, tcpNoDelay, nodict);
+        SslEstablished(poller, *sslSock, f, isClient, brokerTimer, maxNegotiateTime, tcpNoDelay, nodict);
         return;
     }
 
@@ -239,7 +247,7 @@ void SslMuxProtocolFactory::established(
        boost::bind(&AsynchIOHandler::nobuffs, async, _1),
        boost::bind(&AsynchIOHandler::idle, async, _1));
 
-    async->init(aio, 4);
+    async->init(aio, brokerTimer, maxNegotiateTime, 4);
     aio->start(poller);
 }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/TCPIOPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/TCPIOPlugin.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/TCPIOPlugin.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/TCPIOPlugin.cpp Fri Aug  3 12:13:32 2012
@@ -36,14 +36,21 @@
 namespace qpid {
 namespace sys {
 
+class Timer;
+
 class AsynchIOProtocolFactory : public ProtocolFactory {
-    const bool tcpNoDelay;
     boost::ptr_vector<Socket> listeners;
     boost::ptr_vector<AsynchAcceptor> acceptors;
+    Timer& brokerTimer;
+    uint32_t maxNegotiateTime;
     uint16_t listeningPort;
+    const bool tcpNoDelay;
 
   public:
-    AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen);
+    AsynchIOProtocolFactory(const std::string& host, const std::string& port,
+                            int backlog, bool nodelay,
+                            Timer& timer, uint32_t maxTime,
+                            bool shouldListen);
     void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
     void connect(Poller::shared_ptr, const std::string& host, const std::string& port,
                  ConnectionCodec::Factory*,
@@ -90,6 +97,7 @@ static class TCPIOPlugin : public Plugin
                     "", boost::lexical_cast<std::string>(opts.port),
                     opts.connectionBacklog,
                     opts.tcpNoDelay,
+                    broker->getTimer(), opts.maxNegotiateTime,
                     shouldListen));
 
             if (shouldListen) {
@@ -101,7 +109,12 @@ static class TCPIOPlugin : public Plugin
     }
 } tcpPlugin;
 
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port, int backlog, bool nodelay, bool shouldListen) :
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(const std::string& host, const std::string& port,
+                                                 int backlog, bool nodelay,
+                                                 Timer& timer, uint32_t maxTime,
+                                                 bool shouldListen) :
+    brokerTimer(timer),
+    maxNegotiateTime(maxTime),
     tcpNoDelay(nodelay)
 {
     if (!shouldListen) {
@@ -153,7 +166,7 @@ void AsynchIOProtocolFactory::establishe
        boost::bind(&AsynchIOHandler::nobuffs, async, _1),
        boost::bind(&AsynchIOHandler::idle, async, _1));
 
-    async->init(aio, 4);
+    async->init(aio, brokerTimer, maxNegotiateTime, 4);
     aio->start(poller);
 }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/Timer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/Timer.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/Timer.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/Timer.cpp Fri Aug  3 12:13:32 2012
@@ -35,7 +35,7 @@ TimerTask::TimerTask(Duration timeout, c
     sortTime(AbsTime::FarFuture()),
     period(timeout),
     nextFireTime(AbsTime::now(), timeout),
-    cancelled(false)
+    state(WAITING)
 {}
 
 TimerTask::TimerTask(AbsTime time, const std::string&  n) :
@@ -43,7 +43,7 @@ TimerTask::TimerTask(AbsTime time, const
     sortTime(AbsTime::FarFuture()),
     period(0),
     nextFireTime(time),
-    cancelled(false)
+    state(WAITING)
 {}
 
 TimerTask::~TimerTask() {}
@@ -52,27 +52,48 @@ bool TimerTask::readyToFire() const {
     return !(nextFireTime > AbsTime::now());
 }
 
+bool TimerTask::prepareToFire() {
+    Monitor::ScopedLock l(stateMonitor);
+    if (state != CANCELLED) {
+        state = CALLING;
+        return true;
+    } else {
+        return false;
+    }
+}
+
 void TimerTask::fireTask() {
-    cancelled = true;
     fire();
 }
 
+void TimerTask::finishFiring() {
+    Monitor::ScopedLock l(stateMonitor);
+    if (state != CANCELLED) {
+        state = WAITING;
+        stateMonitor.notifyAll();
+    }
+}
+
 // This can only be used to setup the next fire time. After the Timer has already fired
 void TimerTask::setupNextFire() {
     if (period && readyToFire()) {
         nextFireTime = max(AbsTime::now(), AbsTime(nextFireTime, period));
-        cancelled = false;
     } else {
         QPID_LOG(error, name << " couldn't setup next timer firing: " << Duration(nextFireTime, AbsTime::now()) << "[" << period << "]");
     }
 }
 
 // Only allow tasks to be delayed
-void TimerTask::restart() { nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period)); }
+void TimerTask::restart() {
+    nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period));
+}
 
 void TimerTask::cancel() {
-    ScopedLock<Mutex> l(callbackLock);
-    cancelled = true;
+    Monitor::ScopedLock l(stateMonitor);
+    while (state == CALLING) {
+        stateMonitor.wait();
+    }
+    state = CANCELLED;
 }
 
 void TimerTask::setFired() {
@@ -96,6 +117,22 @@ Timer::~Timer()
     stop();
 }
 
+class TimerTaskCallbackScope {
+    TimerTask& tt;
+public:
+    explicit TimerTaskCallbackScope(TimerTask& t) :
+        tt(t)
+    {}
+
+    operator bool() {
+        return !tt.prepareToFire();
+    }
+
+    ~TimerTaskCallbackScope() {
+        tt.finishFiring();
+    }
+};
+
 // TODO AStitcher 21/08/09 The threshholds for emitting warnings are a little arbitrary
 void Timer::run()
 {
@@ -112,8 +149,8 @@ void Timer::run()
             AbsTime start(AbsTime::now());
             Duration delay(t->sortTime, start);
             {
-            ScopedLock<Mutex> l(t->callbackLock);
-            if (t->cancelled) {
+            TimerTaskCallbackScope s(*t);
+            if (s) {
                 {
                     Monitor::ScopedUnlock u(monitor);
                     drop(t);

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/Timer.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/Timer.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/Timer.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/Timer.h Fri Aug  3 12:13:32 2012
@@ -40,6 +40,7 @@ class Timer;
 
 class TimerTask : public RefCounted {
   friend class Timer;
+  friend class TimerTaskCallbackScope;
   friend bool operator<(const boost::intrusive_ptr<TimerTask>&,
                         const boost::intrusive_ptr<TimerTask>&);
 
@@ -47,9 +48,11 @@ class TimerTask : public RefCounted {
     AbsTime sortTime;
     Duration period;
     AbsTime nextFireTime;
-    Mutex callbackLock;
-    volatile bool cancelled;
+    qpid::sys::Monitor stateMonitor;
+    enum {WAITING, CALLING, CANCELLED} state;
 
+    bool prepareToFire();
+    void finishFiring();
     bool readyToFire() const;
     void fireTask();
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/cyrus/CyrusSecurityLayer.cpp Fri Aug  3 12:13:32 2012
@@ -18,6 +18,7 @@
  * under the License.
  *
  */
+#include <unistd.h>
 #include "qpid/sys/cyrus/CyrusSecurityLayer.h"
 #include <algorithm>
 #include "qpid/framing/reply_exceptions.h"

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/posix/LockFile.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/posix/LockFile.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/posix/LockFile.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/posix/LockFile.cpp Fri Aug  3 12:13:32 2012
@@ -46,7 +46,7 @@ LockFile::LockFile(const std::string& pa
     errno = 0;
     int flags=create ? O_WRONLY|O_CREAT|O_NOFOLLOW : O_RDWR;
     int fd = ::open(path.c_str(), flags, 0644);
-    if (fd < 0) throw ErrnoException("Cannot open " + path, errno);
+    if (fd < 0) throw ErrnoException("Cannot open lock file " + path, errno);
     if (::lockf(fd, F_TLOCK, 0) < 0) {
         ::close(fd);
         throw ErrnoException("Cannot lock " + path, errno);

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/posix/MemStat.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/posix/MemStat.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/posix/MemStat.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/posix/MemStat.cpp Fri Aug  3 12:13:32 2012
@@ -20,6 +20,7 @@
  */
 
 #include "qpid/sys/MemStat.h"
+
 #include <malloc.h>
 
 void qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory* object)
@@ -35,4 +36,3 @@ void qpid::sys::MemStat::loadMemInfo(qmf
     object->set_malloc_keepcost(info.keepcost);
 }
 
-

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SocketAddress.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SocketAddress.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SocketAddress.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SocketAddress.cpp Fri Aug  3 12:13:32 2012
@@ -35,14 +35,16 @@ namespace sys {
 SocketAddress::SocketAddress(const std::string& host0, const std::string& port0) :
     host(host0),
     port(port0),
-    addrInfo(0)
+    addrInfo(0),
+    currentAddrInfo(0)
 {
 }
 
 SocketAddress::SocketAddress(const SocketAddress& sa) :
     host(sa.host),
     port(sa.port),
-    addrInfo(0)
+    addrInfo(0),
+    currentAddrInfo(0)
 {
 }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SystemInfo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SystemInfo.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SystemInfo.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/posix/SystemInfo.cpp Fri Aug  3 12:13:32 2012
@@ -18,10 +18,11 @@
  *
  */
 
+#include "qpid/log/Statement.h"
 #include "qpid/sys/SystemInfo.h"
-
 #include "qpid/sys/posix/check.h"
-
+#include <set>
+#include <arpa/inet.h>
 #include <sys/ioctl.h>
 #include <sys/utsname.h>
 #include <sys/types.h> // For FreeBSD
@@ -33,6 +34,7 @@
 #include <fstream>
 #include <sstream>
 #include <netdb.h>
+#include <string.h>
 
 #ifndef HOST_NAME_MAX
 #  define HOST_NAME_MAX 256
@@ -59,48 +61,100 @@ bool SystemInfo::getLocalHostname (Addre
     return true;
 }
 
-static const string LOCALHOST("127.0.0.1");
+static const string LOOPBACK("127.0.0.1");
 static const string TCP("tcp");
 
+// Test IPv4 address for loopback
+inline bool IN_IS_ADDR_LOOPBACK(const ::in_addr* a) {
+    return ((ntohl(a->s_addr) & 0xff000000) == 0x7f000000);
+}
+
+inline bool isLoopback(const ::sockaddr* addr) {
+    switch (addr->sa_family) {
+        case AF_INET: return IN_IS_ADDR_LOOPBACK(&((const ::sockaddr_in*)(const void*)addr)->sin_addr);
+        case AF_INET6: return IN6_IS_ADDR_LOOPBACK(&((const ::sockaddr_in6*)(const void*)addr)->sin6_addr);
+        default: return false;
+    }
+}
+
 void SystemInfo::getLocalIpAddresses (uint16_t port,
                                       std::vector<Address> &addrList) {
     ::ifaddrs* ifaddr = 0;
     QPID_POSIX_CHECK(::getifaddrs(&ifaddr));
     for (::ifaddrs* ifap = ifaddr; ifap != 0; ifap = ifap->ifa_next) {
         if (ifap->ifa_addr == 0) continue;
-
+        if (isLoopback(ifap->ifa_addr)) continue;
         int family = ifap->ifa_addr->sa_family;
         switch (family) {
-        case AF_INET: {
-            char dispName[NI_MAXHOST];
-            int rc = ::getnameinfo(
-                        ifap->ifa_addr,
-                        (family == AF_INET)
-                            ? sizeof(struct sockaddr_in)
-                            : sizeof(struct sockaddr_in6),
-                        dispName, sizeof(dispName),
-                        0, 0, NI_NUMERICHOST);
-            if (rc != 0) {
-                throw QPID_POSIX_ERROR(rc);
+            case AF_INET6: {
+                // Ignore link local addresses as:
+                // * The scope id is illegal in URL syntax
+                // * Clients won't be able to use a link local address
+                //   without adding their own (potentially different) scope id
+                sockaddr_in6* sa6 = (sockaddr_in6*)(ifap->ifa_addr);
+                if (IN6_IS_ADDR_LINKLOCAL(&sa6->sin6_addr)) break;
+                // Fallthrough
             }
-            string addr(dispName);
-            if (addr != LOCALHOST) {
-                addrList.push_back(Address(TCP, addr, port));
-            }
-            break;
-        }
-        // TODO: Url parsing currently can't cope with IPv6 addresses so don't return them
-        // when it can cope move this line to above "case AF_INET:"
-        case AF_INET6:
-        default:
+            case AF_INET: {
+              char dispName[NI_MAXHOST];
+              int rc = ::getnameinfo(
+                  ifap->ifa_addr,
+                  (family == AF_INET)
+                  ? sizeof(struct sockaddr_in)
+                  : sizeof(struct sockaddr_in6),
+                  dispName, sizeof(dispName),
+                  0, 0, NI_NUMERICHOST);
+              if (rc != 0) {
+                  throw QPID_POSIX_ERROR(rc);
+              }
+              string addr(dispName);
+              addrList.push_back(Address(TCP, addr, port));
+              break;
+          }
+          default:
             continue;
         }
     }
-    freeifaddrs(ifaddr);
+    ::freeifaddrs(ifaddr);
 
     if (addrList.empty()) {
-        addrList.push_back(Address(TCP, LOCALHOST, port));
+        addrList.push_back(Address(TCP, LOOPBACK, port));
+    }
+}
+
+namespace {
+struct AddrInfo {
+    struct addrinfo* ptr;
+    AddrInfo(const std::string& host) : ptr(0) {
+        ::addrinfo hints;
+        ::memset(&hints, 0, sizeof(hints));
+        hints.ai_family = AF_UNSPEC; // Allow both IPv4 and IPv6
+        if (::getaddrinfo(host.c_str(), NULL, &hints, &ptr) != 0)
+            ptr = 0;
+    }
+    ~AddrInfo() { if (ptr) ::freeaddrinfo(ptr); }
+};
+}
+
+bool SystemInfo::isLocalHost(const std::string& host) {
+    std::vector<Address> myAddrs;
+    getLocalIpAddresses(0, myAddrs);
+    std::set<string> localHosts;
+    for (std::vector<Address>::const_iterator i = myAddrs.begin(); i != myAddrs.end(); ++i)
+        localHosts.insert(i->host);
+    // Resolve host
+    AddrInfo ai(host);
+    if (!ai.ptr) return false;
+    for (struct addrinfo *res = ai.ptr; res != NULL; res = res->ai_next) {
+        if (isLoopback(res->ai_addr)) return true;
+        // Get string form of IP addr
+        char addr[NI_MAXHOST] = "";
+        int error = ::getnameinfo(res->ai_addr, res->ai_addrlen, addr, NI_MAXHOST, NULL, 0,
+                                  NI_NUMERICHOST | NI_NUMERICSERV);
+        if (error) return false;
+        if (localHosts.find(addr) != localHosts.end()) return true;
     }
+    return false;
 }
 
 void SystemInfo::getSystemId (std::string &osName,

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.cpp Fri Aug  3 12:13:32 2012
@@ -19,9 +19,9 @@
  *
  */
 #include "qpid/sys/ssl/SslHandler.h"
-
 #include "qpid/sys/ssl/SslIo.h"
 #include "qpid/sys/ssl/SslSocket.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/framing/AMQP_HighestVersion.h"
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/log/Statement.h"
@@ -42,6 +42,24 @@ struct Buff : public SslIO::BufferBase {
     { delete [] bytes;}
 };
 
+struct ProtocolTimeoutTask : public sys::TimerTask {
+    SslHandler& handler;
+    std::string id;
+
+    ProtocolTimeoutTask(const std::string& i, const Duration& timeout, SslHandler& h) :
+    TimerTask(timeout, "ProtocolTimeout"),
+        handler(h),
+        id(i)
+    {}
+
+    void fire() {
+        // If this fires it means that we didn't negotiate the connection in the timeout period
+        // Schedule closing the connection for the io thread
+        QPID_LOG(error, "Connection " << id << " No protocol received closing");
+        handler.abort();
+    }
+};
+
 SslHandler::SslHandler(std::string id, ConnectionCodec::Factory* f, bool _nodict) :
     identifier(id),
     aio(0),
@@ -55,12 +73,18 @@ SslHandler::SslHandler(std::string id, C
 SslHandler::~SslHandler() {
     if (codec)
         codec->closed();
+    if (timeoutTimerTask)
+        timeoutTimerTask->cancel();
     delete codec;
 }
 
-void SslHandler::init(SslIO* a, int numBuffs) {
+void SslHandler::init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs) {
     aio = a;
 
+    // Start timer for this connection
+    timeoutTimerTask = new ProtocolTimeoutTask(identifier, maxTime*TIME_MSEC, *this);
+    timer.add(timeoutTimerTask);
+
     // Give connection some buffers to use
     for (int i = 0; i < numBuffs; i++) {
         aio->queueReadBuffer(new Buff);
@@ -80,8 +104,10 @@ void SslHandler::write(const framing::Pr
 }
 
 void SslHandler::abort() {
-    // TODO: can't implement currently as underlying functionality not implemented
-    // aio->requestCallback(boost::bind(&SslHandler::eof, this, _1));
+    // Don't disconnect if we're already disconnecting
+    if (!readError) {
+        aio->requestCallback(boost::bind(&SslHandler::eof, this, _1));
+    }
 }
 void SslHandler::activateOutput() {
     aio->notifyPendingWrite();
@@ -109,6 +135,9 @@ void SslHandler::readbuff(SslIO& , SslIO
         framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
         framing::ProtocolInitiation protocolInit;
         if (protocolInit.decode(in)) {
+            // We've just got the protocol negotiation so we can cancel the timeout for that
+            timeoutTimerTask->cancel();
+
             decoded = in.getPosition();
             QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
             try {
@@ -169,6 +198,10 @@ void SslHandler::idle(SslIO&){
     if (isClient && codec == 0) {
         codec = factory->create(*this, identifier, getSecuritySettings(aio));
         write(framing::ProtocolInitiation(codec->getVersion()));
+        // We've just sent the protocol negotiation so we can cancel the timeout for that
+        // This is not ideal, because we've not received anything yet, but heartbeats will
+        // be active soon
+        timeoutTimerTask->cancel();
         return;
     }
     if (codec == 0) return;

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslHandler.h Fri Aug  3 12:13:32 2012
@@ -25,6 +25,8 @@
 #include "qpid/sys/ConnectionCodec.h"
 #include "qpid/sys/OutputControl.h"
 
+#include <boost/intrusive_ptr.hpp>
+
 namespace qpid {
 
 namespace framing {
@@ -32,6 +34,10 @@ namespace framing {
 }
 
 namespace sys {
+
+class Timer;
+class TimerTask;
+
 namespace ssl {
 
 class SslIO;
@@ -46,6 +52,7 @@ class SslHandler : public OutputControl 
     bool readError;
     bool isClient;
     bool nodict;
+    boost::intrusive_ptr<sys::TimerTask> timeoutTimerTask;
 
     void write(const framing::ProtocolInitiation&);
     qpid::sys::SecuritySettings getSecuritySettings(SslIO* aio);
@@ -53,7 +60,7 @@ class SslHandler : public OutputControl 
   public:
     SslHandler(std::string id, ConnectionCodec::Factory* f, bool nodict);
     ~SslHandler();
-    void init(SslIO* a, int numBuffs);
+    void init(SslIO* a, Timer& timer, uint32_t maxTime, int numBuffs);
 
     void setClient() { isClient = true; }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.cpp Fri Aug  3 12:13:32 2012
@@ -257,6 +257,18 @@ void SslIO::queueWriteClose() {
     DispatchHandle::rewatchWrite();
 }
 
+void SslIO::requestCallback(RequestCallback callback) {
+    // TODO creating a function object every time isn't all that
+    // efficient - if this becomes heavily used do something better (what?)
+    assert(callback);
+    DispatchHandle::call(boost::bind(&SslIO::requestedCall, this, callback));
+}
+
+void SslIO::requestedCall(RequestCallback callback) {
+    assert(callback);
+    callback(*this);
+}
+
 /** Return a queued buffer if there are enough
  * to spare
  */

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/ssl/SslIo.h Fri Aug  3 12:13:32 2012
@@ -125,6 +125,7 @@ public:
     typedef boost::function2<void, SslIO&, const SslSocket&> ClosedCallback;
     typedef boost::function1<void, SslIO&> BuffersEmptyCallback;
     typedef boost::function1<void, SslIO&> IdleCallback;
+    typedef boost::function1<void, SslIO&> RequestCallback;
 
 
 private:
@@ -159,6 +160,7 @@ public:
     void notifyPendingWrite();
     void queueWriteClose();
     bool writeQueueEmpty() { return writeQueue.empty(); }
+    void requestCallback(RequestCallback);
     BufferBase* getQueuedBuffer();
 
     qpid::sys::SecuritySettings getSecuritySettings();
@@ -168,6 +170,7 @@ private:
     void readable(qpid::sys::DispatchHandle& handle);
     void writeable(qpid::sys::DispatchHandle& handle);
     void disconnected(qpid::sys::DispatchHandle& handle);
+    void requestedCall(RequestCallback);
     void close(qpid::sys::DispatchHandle& handle);
 };
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/unordered_map.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/unordered_map.h?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/unordered_map.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/unordered_map.h Fri Aug  3 12:13:32 2012
@@ -23,6 +23,8 @@
 
 #ifdef _MSC_VER
 #  include <unordered_map>
+#elif defined(__SUNPRO_CC)
+#  include <boost/tr1/unordered_map.hpp>
 #else
 #  include <tr1/unordered_map>
 #endif /* _MSC_VER */

Modified: qpid/branches/asyncstore/cpp/src/qpid/sys/windows/IocpPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/sys/windows/IocpPoller.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/sys/windows/IocpPoller.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/sys/windows/IocpPoller.cpp Fri Aug  3 12:13:32 2012
@@ -96,6 +96,7 @@ void Poller::shutdown() {
     // Allow sloppy code to shut us down more than once.
     if (impl->isShutdown)
         return;
+    impl->isShutdown = true;
     ULONG_PTR key = 1;    // Tell wait() it's a shutdown, not I/O
     PostQueuedCompletionStatus(impl->iocp, 0, key, 0);
 }
@@ -110,7 +111,7 @@ bool Poller::interrupt(PollerHandle&) {
 }
 
 void Poller::run() {
-    do {
+    while (!impl->isShutdown) {
         Poller::Event event = this->wait();
 
         // Handle shutdown
@@ -124,7 +125,7 @@ void Poller::run() {
           // This should be impossible
           assert(false);
         }
-    } while (true);
+    }
 }
 
 void Poller::monitorHandle(PollerHandle& handle, Direction dir) {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org