You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/09/19 17:13:38 UTC

svn commit: r1172657 [3/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/ cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/ cpp/bindings/qpid/dotnet/examples/cshar...

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qmf/AgentSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qmf/AgentSession.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qmf/AgentSession.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qmf/AgentSession.cpp Mon Sep 19 15:13:18 2011
@@ -19,133 +19,7 @@
  *
  */
 
-#include "qpid/RefCounted.h"
-#include "qmf/PrivateImplRef.h"
-#include "qmf/exceptions.h"
-#include "qmf/AgentSession.h"
-#include "qmf/AgentEventImpl.h"
-#include "qmf/SchemaIdImpl.h"
-#include "qmf/SchemaImpl.h"
-#include "qmf/DataAddrImpl.h"
-#include "qmf/DataImpl.h"
-#include "qmf/QueryImpl.h"
-#include "qmf/agentCapability.h"
-#include "qmf/constants.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Condition.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/log/Statement.h"
-#include "qpid/messaging/Connection.h"
-#include "qpid/messaging/Session.h"
-#include "qpid/messaging/Receiver.h"
-#include "qpid/messaging/Sender.h"
-#include "qpid/messaging/Message.h"
-#include "qpid/messaging/AddressParser.h"
-#include "qpid/management/Buffer.h"
-#include <queue>
-#include <map>
-#include <set>
-#include <iostream>
-#include <memory>
-
-using namespace std;
-using namespace qpid::messaging;
-using namespace qmf;
-using qpid::types::Variant;
-
-namespace qmf {
-    class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
-    public:
-        ~AgentSessionImpl();
-
-        //
-        // Methods from API handle
-        //
-        AgentSessionImpl(Connection& c, const string& o);
-        void setDomain(const string& d) { checkOpen(); domain = d; }
-        void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; }
-        void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; }
-        void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; }
-        void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; }
-        const string& getName() const { return agentName; }
-        void open();
-        void close();
-        bool nextEvent(AgentEvent& e, Duration t);
-        int pendingEvents() const;
-
-        void registerSchema(Schema& s);
-        DataAddr addData(Data& d, const string& n, bool persist);
-        void delData(const DataAddr&);
-
-        void authAccept(AgentEvent& e);
-        void authReject(AgentEvent& e, const string& m);
-        void raiseException(AgentEvent& e, const string& s);
-        void raiseException(AgentEvent& e, const Data& d);
-        void response(AgentEvent& e, const Data& d);
-        void complete(AgentEvent& e);
-        void methodSuccess(AgentEvent& e);
-        void raiseEvent(const Data& d);
-        void raiseEvent(const Data& d, int s);
-
-    private:
-        typedef map<DataAddr, Data, DataAddrCompare> DataIndex;
-        typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap;
-
-        mutable qpid::sys::Mutex lock;
-        qpid::sys::Condition cond;
-        Connection connection;
-        Session session;
-        Sender directSender;
-        Sender topicSender;
-        string domain;
-        Variant::Map attributes;
-        Variant::Map options;
-        string agentName;
-        bool opened;
-        queue<AgentEvent> eventQueue;
-        qpid::sys::Thread* thread;
-        bool threadCanceled;
-        uint32_t bootSequence;
-        uint32_t interval;
-        uint64_t lastHeartbeat;
-        uint64_t lastVisit;
-        bool forceHeartbeat;
-        bool externalStorage;
-        bool autoAllowQueries;
-        bool autoAllowMethods;
-        uint32_t maxSubscriptions;
-        uint32_t minSubInterval;
-        uint32_t subLifetime;
-        bool publicEvents;
-        bool listenOnDirect;
-        bool strictSecurity;
-        uint64_t schemaUpdateTime;
-        string directBase;
-        string topicBase;
-
-        SchemaMap schemata;
-        DataIndex globalIndex;
-        map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex;
-
-        void checkOpen();
-        void setAgentName();
-        void enqueueEvent(const AgentEvent&);
-        void handleLocateRequest(const Variant::List& content, const Message& msg);
-        void handleMethodRequest(const Variant::Map& content, const Message& msg);
-        void handleQueryRequest(const Variant::Map& content, const Message& msg);
-        void handleSchemaRequest(AgentEvent&);
-        void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&);
-        void dispatch(Message);
-        void sendHeartbeat();
-        void send(Message, const Address&);
-        void flushResponses(AgentEvent&, bool);
-        void periodicProcessing(uint64_t);
-        void run();
-    };
-}
-
-typedef qmf::PrivateImplRef<AgentSession> PI;
+#include "qmf/AgentSessionImpl.h"
 
 AgentSession::AgentSession(AgentSessionImpl* impl) { PI::ctor(*this, impl); }
 AgentSession::AgentSession(const AgentSession& s) : qmf::Handle<AgentSessionImpl>() { PI::copy(*this, s); }
@@ -181,11 +55,11 @@ void AgentSession::raiseEvent(const Data
 //========================================================================================
 
 AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
-    connection(c), domain("default"), opened(false), thread(0), threadCanceled(false),
+    connection(c), domain("default"), opened(false), eventNotifier(0), thread(0), threadCanceled(false),
     bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
     externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
     maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
-    listenOnDirect(true), strictSecurity(false),
+    listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
     schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
 {
     //
@@ -246,7 +120,14 @@ AgentSessionImpl::AgentSessionImpl(Conne
         iter = optMap.find("strict-security");
         if (iter != optMap.end())
             strictSecurity = iter->second.asBool();
+
+        iter = optMap.find("max-thread-wait-time");
+        if (iter != optMap.end())
+            maxThreadWaitTime = iter->second.asUint32();
     }
+
+    if (maxThreadWaitTime > interval)
+        maxThreadWaitTime = interval;
 }
 
 
@@ -254,6 +135,11 @@ AgentSessionImpl::~AgentSessionImpl()
 {
     if (opened)
         close();
+
+    if (thread) {
+        thread->join();
+        delete thread;
+    }
 }
 
 
@@ -262,6 +148,12 @@ void AgentSessionImpl::open()
     if (opened)
         throw QmfException("The session is already open");
 
+    // If the thread exists, join and delete it before creating a new one.
+    if (thread) {
+        thread->join();
+        delete thread;
+    }
+
     const string addrArgs(";{create:never,node:{type:topic}}");
     const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str());
     attributes["_direct_subject"] = routableAddr;
@@ -299,34 +191,47 @@ void AgentSessionImpl::open()
 }
 
 
-void AgentSessionImpl::close()
+void AgentSessionImpl::closeAsync()
 {
     if (!opened)
         return;
 
-    // Stop and join the receiver thread
+    // Stop the receiver thread.  Don't join it until the destructor is called or open() is called.
     threadCanceled = true;
-    thread->join();
-    delete thread;
-
-    // Close the AMQP session
-    session.close();
     opened = false;
 }
 
 
+void AgentSessionImpl::close()
+{
+    closeAsync();
+
+    if (thread) {
+        thread->join();
+        delete thread;
+        thread = 0;
+    }
+}
+
+
 bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
 {
     uint64_t milliseconds = timeout.getMilliseconds();
     qpid::sys::Mutex::ScopedLock l(lock);
 
-    if (eventQueue.empty() && milliseconds > 0)
-        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
-                                           qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
+    if (eventQueue.empty() && milliseconds > 0) {
+        int64_t nsecs(qpid::sys::TIME_INFINITE);
+        if ((uint64_t)(nsecs / 1000000) > milliseconds)
+            nsecs = (int64_t) milliseconds * 1000000;
+        qpid::sys::Duration then(nsecs);
+        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
+    }
 
     if (!eventQueue.empty()) {
         event = eventQueue.front();
         eventQueue.pop();
+        if (eventQueue.empty())
+            alertEventNotifierLH(false);
         return true;
     }
 
@@ -341,6 +246,19 @@ int AgentSessionImpl::pendingEvents() co
 }
 
 
+void AgentSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    eventNotifier = notifier;
+}
+
+EventNotifierImpl* AgentSessionImpl::getEventNotifier() const
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    return eventNotifier;
+}
+
+
 void AgentSessionImpl::registerSchema(Schema& schema)
 {
     if (!schema.isFinalized())
@@ -596,8 +514,10 @@ void AgentSessionImpl::enqueueEvent(cons
     qpid::sys::Mutex::ScopedLock l(lock);
     bool notify = eventQueue.empty();
     eventQueue.push(event);
-    if (notify)
+    if (notify) {
         cond.notify();
+        alertEventNotifierLH(true);
+    }
 }
 
 
@@ -1041,6 +961,13 @@ void AgentSessionImpl::periodicProcessin
 }
 
 
+void AgentSessionImpl::alertEventNotifierLH(bool readable)
+{
+    if (eventNotifier)
+        eventNotifier->setReadable(readable);
+}
+
+
 void AgentSessionImpl::run()
 {
     QPID_LOG(debug, "AgentSession thread started for agent " << agentName);
@@ -1050,7 +977,7 @@ void AgentSessionImpl::run()
             periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC);
 
             Receiver rx;
-            bool valid = session.nextReceiver(rx, Duration::SECOND);
+            bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
             if (threadCanceled)
                 break;
             if (valid) {
@@ -1067,6 +994,19 @@ void AgentSessionImpl::run()
         enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED)));
     }
 
+    session.close();
     QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName);
 }
 
+
+AgentSessionImpl& AgentSessionImplAccess::get(AgentSession& session)
+{
+    return *session.impl;
+}
+
+
+const AgentSessionImpl& AgentSessionImplAccess::get(const AgentSession& session)
+{
+    return *session.impl;
+}
+

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qmf/ConsoleSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qmf/ConsoleSession.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qmf/ConsoleSession.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qmf/ConsoleSession.cpp Mon Sep 19 15:13:18 2011
@@ -66,9 +66,9 @@ Subscription ConsoleSession::subscribe(c
 //========================================================================================
 
 ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
-    connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false),
-    opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
-    connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
+    connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
+    opened(false), eventNotifier(0), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
+    connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1)
 {
     if (!options.empty()) {
         qpid::messaging::AddressParser parser(options);
@@ -92,7 +92,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(C
         iter = optMap.find("strict-security");
         if (iter != optMap.end())
             strictSecurity = iter->second.asBool();
+
+        iter = optMap.find("max-thread-wait-time");
+        if (iter != optMap.end())
+            maxThreadWaitTime = iter->second.asUint32();
     }
+
+    if (maxThreadWaitTime > 60)
+        maxThreadWaitTime = 60;
 }
 
 
@@ -100,6 +107,11 @@ ConsoleSessionImpl::~ConsoleSessionImpl(
 {
     if (opened)
         close();
+
+    if (thread) {
+        thread->join();
+        delete thread;
+    }
 }
 
 
@@ -154,6 +166,12 @@ void ConsoleSessionImpl::open()
     if (opened)
         throw QmfException("The session is already open");
 
+    // If the thread exists, join and delete it before creating a new one.
+    if (thread) {
+        thread->join();
+        delete thread;
+    }
+
     // Establish messaging addresses
     directBase = "qmf." + domain + ".direct";
     topicBase = "qmf." + domain + ".topic";
@@ -182,45 +200,57 @@ void ConsoleSessionImpl::open()
 
     // Start the receiver thread
     threadCanceled = false;
+    opened = true;
     thread = new qpid::sys::Thread(*this);
 
     // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent.
     sendBrokerLocate();
     if (agentQuery)
         sendAgentLocate();
-
-    opened = true;
 }
 
 
-void ConsoleSessionImpl::close()
+void ConsoleSessionImpl::closeAsync()
 {
     if (!opened)
         throw QmfException("The session is already closed");
 
-    // Stop and join the receiver thread
+    // Stop the receiver thread.  Don't join it until the destructor is called or open() is called.
     threadCanceled = true;
-    thread->join();
-    delete thread;
-
-    // Close the AMQP session
-    session.close();
     opened = false;
 }
 
 
+void ConsoleSessionImpl::close()
+{
+    closeAsync();
+
+    if (thread) {
+        thread->join();
+        delete thread;
+        thread = 0;
+    }
+}
+
+
 bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout)
 {
     uint64_t milliseconds = timeout.getMilliseconds();
     qpid::sys::Mutex::ScopedLock l(lock);
 
-    if (eventQueue.empty() && milliseconds > 0)
-        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
-                                           qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
+    if (eventQueue.empty() && milliseconds > 0) {
+        int64_t nsecs(qpid::sys::TIME_INFINITE);
+        if ((uint64_t)(nsecs / 1000000) > milliseconds)
+            nsecs = (int64_t) milliseconds * 1000000;
+        qpid::sys::Duration then(nsecs);
+        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
+    }
 
     if (!eventQueue.empty()) {
         event = eventQueue.front();
         eventQueue.pop();
+        if (eventQueue.empty())
+            alertEventNotifierLH(false);
         return true;
     }
 
@@ -235,6 +265,20 @@ int ConsoleSessionImpl::pendingEvents() 
 }
 
 
+void ConsoleSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    eventNotifier = notifier;
+}
+
+
+EventNotifierImpl* ConsoleSessionImpl::getEventNotifier() const
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    return eventNotifier;
+}
+
+
 uint32_t ConsoleSessionImpl::getAgentCount() const
 {
     qpid::sys::Mutex::ScopedLock l(lock);
@@ -276,8 +320,10 @@ void ConsoleSessionImpl::enqueueEventLH(
 {
     bool notify = eventQueue.empty();
     eventQueue.push(event);
-    if (notify)
+    if (notify) {
         cond.notify();
+        alertEventNotifierLH(true);
+    }
 }
 
 
@@ -586,6 +632,13 @@ void ConsoleSessionImpl::periodicProcess
 }
 
 
+void ConsoleSessionImpl::alertEventNotifierLH(bool readable)
+{
+    if (eventNotifier)
+        eventNotifier->setReadable(readable);
+}
+
+
 void ConsoleSessionImpl::run()
 {
     QPID_LOG(debug, "ConsoleSession thread started");
@@ -596,7 +649,7 @@ void ConsoleSessionImpl::run()
                                qpid::sys::TIME_SEC);
 
             Receiver rx;
-            bool valid = session.nextReceiver(rx, Duration::SECOND);
+            bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
             if (threadCanceled)
                 break;
             if (valid) {
@@ -613,6 +666,18 @@ void ConsoleSessionImpl::run()
         enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED)));
     }
 
+    session.close();
     QPID_LOG(debug, "ConsoleSession thread exiting");
 }
 
+
+ConsoleSessionImpl& ConsoleSessionImplAccess::get(ConsoleSession& session)
+{
+  return *session.impl;
+}
+
+
+const ConsoleSessionImpl& ConsoleSessionImplAccess::get(const ConsoleSession& session)
+{
+  return *session.impl;
+}

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qmf/ConsoleSessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qmf/ConsoleSessionImpl.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qmf/ConsoleSessionImpl.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qmf/ConsoleSessionImpl.h Mon Sep 19 15:13:18 2011
@@ -27,6 +27,7 @@
 #include "qmf/SchemaId.h"
 #include "qmf/Schema.h"
 #include "qmf/ConsoleEventImpl.h"
+#include "qmf/EventNotifierImpl.h"
 #include "qmf/SchemaCache.h"
 #include "qmf/Query.h"
 #include "qpid/sys/Mutex.h"
@@ -41,9 +42,14 @@
 #include "qpid/messaging/Address.h"
 #include "qpid/management/Buffer.h"
 #include "qpid/types/Variant.h"
+
+#include <boost/shared_ptr.hpp>
 #include <map>
 #include <queue>
 
+using namespace boost;
+using namespace std;
+
 namespace qmf {
     class ConsoleSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
     public:
@@ -56,9 +62,14 @@ namespace qmf {
         void setDomain(const std::string& d) { domain = d; }
         void setAgentFilter(const std::string& f);
         void open();
+        void closeAsync();
         void close();
         bool nextEvent(ConsoleEvent& e, qpid::messaging::Duration t);
         int pendingEvents() const;
+
+        void setEventNotifier(EventNotifierImpl* notifier);
+        EventNotifierImpl* getEventNotifier() const;
+
         uint32_t getAgentCount() const;
         Agent getAgent(uint32_t i) const;
         Agent getConnectedBrokerAgent() const { return connectedBrokerAgent; }
@@ -76,9 +87,11 @@ namespace qmf {
         uint32_t maxAgentAgeMinutes;
         bool listenOnDirect;
         bool strictSecurity;
+        uint32_t maxThreadWaitTime;
         Query agentQuery;
         bool opened;
         std::queue<ConsoleEvent> eventQueue;
+        EventNotifierImpl* eventNotifier;
         qpid::sys::Thread* thread;
         bool threadCanceled;
         uint64_t lastVisit;
@@ -90,6 +103,8 @@ namespace qmf {
         std::string directBase;
         std::string topicBase;
         boost::shared_ptr<SchemaCache> schemaCache;
+        qpid::sys::Mutex corrlock;
+        uint32_t nextCorrelator;
 
         void enqueueEvent(const ConsoleEvent&);
         void enqueueEventLH(const ConsoleEvent&);
@@ -99,10 +114,17 @@ namespace qmf {
         void handleAgentUpdate(const std::string&, const qpid::types::Variant::Map&, const qpid::messaging::Message&);
         void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&);
         void periodicProcessing(uint64_t);
+        void alertEventNotifierLH(bool readable);
         void run();
+        uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; }
 
         friend class AgentImpl;
     };
+
+    struct ConsoleSessionImplAccess {
+        static ConsoleSessionImpl& get(ConsoleSession& session);
+        static const ConsoleSessionImpl& get(const ConsoleSession& session);
+    };
 }
 
 #endif

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qmf/DataAddr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qmf/DataAddr.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qmf/DataAddr.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qmf/DataAddr.cpp Mon Sep 19 15:13:18 2011
@@ -36,7 +36,9 @@ DataAddr::~DataAddr() { PI::dtor(*this);
 DataAddr& DataAddr::operator=(const DataAddr& s) { return PI::assign(*this, s); }
 
 bool DataAddr::operator==(const DataAddr& o) { return *impl == *o.impl; }
+bool DataAddr::operator==(const DataAddr& o) const { return *impl == *o.impl; }
 bool DataAddr::operator<(const DataAddr& o) { return *impl < *o.impl; }
+bool DataAddr::operator<(const DataAddr& o) const { return *impl < *o.impl; }
 
 DataAddr::DataAddr(const qpid::types::Variant::Map& m) { PI::ctor(*this, new DataAddrImpl(m)); }
 DataAddr::DataAddr(const string& n, const string& a, uint32_t e) { PI::ctor(*this, new DataAddrImpl(n, a, e)); }
@@ -45,7 +47,7 @@ const string& DataAddr::getAgentName() c
 uint32_t DataAddr::getAgentEpoch() const { return impl->getAgentEpoch(); }
 Variant::Map DataAddr::asMap() const { return impl->asMap(); }
 
-bool DataAddrImpl::operator==(const DataAddrImpl& other)
+bool DataAddrImpl::operator==(const DataAddrImpl& other) const
 {
     return
         agentName == other.agentName &&
@@ -54,7 +56,7 @@ bool DataAddrImpl::operator==(const Data
 }
 
 
-bool DataAddrImpl::operator<(const DataAddrImpl& other)
+bool DataAddrImpl::operator<(const DataAddrImpl& other) const
 {
     if (agentName < other.agentName) return true;
     if (agentName > other.agentName) return false;

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qmf/DataAddrImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qmf/DataAddrImpl.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qmf/DataAddrImpl.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qmf/DataAddrImpl.h Mon Sep 19 15:13:18 2011
@@ -38,8 +38,8 @@ namespace qmf {
         //
         // Methods from API handle
         //
-        bool operator==(const DataAddrImpl&);
-        bool operator<(const DataAddrImpl&);
+        bool operator==(const DataAddrImpl&) const;
+        bool operator<(const DataAddrImpl&) const;
         DataAddrImpl(const qpid::types::Variant::Map&);
         DataAddrImpl(const std::string& _name, const std::string& _agentName, uint32_t _agentEpoch=0) :
             agentName(_agentName), name(_name), agentEpoch(_agentEpoch) {}

Propchange: qpid/branches/qpid-3346/qpid/cpp/src/qmf/engine/Agent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:13:18 2011
@@ -1,2 +1,2 @@
 /qpid/branches/qpid-2935/qpid/cpp/src/qmf/engine/Agent.cpp:1061302-1072333
-/qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp:1156188
+/qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp:1144319-1172654

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/Address.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/Address.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/Address.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/Address.cpp Mon Sep 19 15:13:18 2011
@@ -28,7 +28,13 @@ namespace qpid {
 const string Address::TCP("tcp");
 
 ostream& operator<<(ostream& os, const Address& a) {
-    return os << a.protocol << ":" << a.host << ":" << a.port;
+    // If the host is an IPv6 literal we need to print "[]" around it
+    // (we detect IPv6 literals because they contain ":" which is otherwise illegal)
+    if (a.host.find(':') != string::npos) {
+        return os << a.protocol << ":[" << a.host << "]:" << a.port;
+    } else {
+        return os << a.protocol << ":" << a.host << ":" << a.port;
+    }
 }
 
 bool operator==(const Address& x, const Address& y) {

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/Url.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/Url.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/Url.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/Url.cpp Mon Sep 19 15:13:18 2011
@@ -156,11 +156,12 @@ class UrlParser {
         return false;
     }
 
-    // TODO aconway 2008-11-20: this does not fully implement
-    // http://www.ietf.org/rfc/rfc3986.txt. Works for DNS names and
-    // ipv4 literals but won't handle ipv6.
+    // A liberal interpretation of http://www.ietf.org/rfc/rfc3986.txt.
+    // Works for DNS names and and ipv4 and ipv6 literals
     // 
     bool host(string& h) {
+        if (ip6literal(h)) return true;
+
         const char* start=i;
         while (unreserved() || pctEncoded())
             ;
@@ -169,6 +170,22 @@ class UrlParser {
         return true;
     }
 
+    // This is a bit too liberal for IPv6 literal addresses, but probably good enough
+    bool ip6literal(string& h) {
+        if (literal("[")) {
+            const char* start = i;
+            while (hexDigit() || literal(":") || literal("."))
+                ;
+            const char* end = i;
+            if ( end-start < 2 ) return false; // Smallest valid address is "::"
+            if (literal("]")) {
+                h.assign(start, end);
+                return true;
+            }
+        }
+        return false;
+    }
+
     bool unreserved() { return (::isalnum(*i) || ::strchr("-._~", *i)) && advance(); }
 
     bool pctEncoded() { return literal("%") && hexDigit() && hexDigit(); }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Mon Sep 19 15:13:18 2011
@@ -1378,13 +1378,26 @@ bool ManagementAgentImpl::ConnectionThre
 
 void ManagementAgentImpl::PublishThread::run()
 {
-    uint16_t    totalSleep;
+    uint16_t totalSleep;
+    uint16_t sleepTime;
 
     while (!shutdown) {
         agent.periodicProcessing();
         totalSleep = 0;
-        while (totalSleep++ < agent.getInterval() && !shutdown) {
-            ::sleep(1);
+
+        //
+        // Calculate a sleep time that is no greater than 5 seconds and
+        // no less than 1 second.
+        //
+        sleepTime = agent.getInterval();
+        if (sleepTime > 5)
+            sleepTime = 5;
+        else if (sleepTime == 0)
+            sleepTime = 1;
+
+        while (totalSleep < agent.getInterval() && !shutdown) {
+            ::sleep(sleepTime);
+            totalSleep += sleepTime;
         }
     }
 }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Mon Sep 19 15:13:18 2011
@@ -188,9 +188,10 @@ void SessionHandler::detach(const std::s
 void SessionHandler::detached(const std::string& name, uint8_t code) {
     CHECK_NAME(name, "session.detached");
     awaitingDetached = false;
-    if (code != session::DETACH_CODE_NORMAL)
+    if (code != session::DETACH_CODE_NORMAL) {
+        sendReady = receiveReady = false;
         channelException(convert(code), "session.detached from peer.");
-    else {
+    } else {
         handleDetach();
     }
 }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h Mon Sep 19 15:13:18 2011
@@ -66,7 +66,7 @@ class QPID_COMMON_CLASS_EXTERN SessionHa
     QPID_COMMON_EXTERN void handleException(const qpid::SessionException& e);
 
     /** True if the handler is ready to send and receive */
-    bool ready() const;
+    QPID_COMMON_EXTERN bool ready() const;
 
     // Protocol methods
     QPID_COMMON_EXTERN void attach(const std::string& name, bool force);

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Bridge.cpp Mon Sep 19 15:13:18 2011
@@ -164,6 +164,12 @@ void Bridge::destroy()
     listener(this);
 }
 
+bool Bridge::isSessionReady() const
+{
+    SessionHandler& sessionHandler = conn->getChannel(id);
+    return sessionHandler.ready();
+}
+
 void Bridge::setPersistenceId(uint64_t pId) const
 {
     persistenceId = pId;

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Bridge.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Bridge.h Mon Sep 19 15:13:18 2011
@@ -59,6 +59,8 @@ public:
     void destroy();
     bool isDurable() { return args.i_durable; }
 
+    bool isSessionReady() const;
+
     management::ManagementObject* GetManagementObject() const;
     management::Manageable::status_t ManagementMethod(uint32_t methodId,
                                                       management::Args& args,

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Broker.cpp Mon Sep 19 15:13:18 2011
@@ -435,8 +435,9 @@ Manageable::status_t Broker::ManagementM
         _qmf::ArgsBrokerConnect& hp=
             dynamic_cast<_qmf::ArgsBrokerConnect&>(args);
 
-        QPID_LOG (debug, "Broker::connect()");
         string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport;
+        QPID_LOG (debug, "Broker::connect() " << hp.i_host << ":" << hp.i_port << "; transport=" << transport <<
+                        "; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\"");
         if (!getProtocolFactory(transport)) {
             QPID_LOG(error, "Transport '" << transport << "' not supported");
             return  Manageable::STATUS_NOT_IMPLEMENTED;
@@ -455,7 +456,7 @@ Manageable::status_t Broker::ManagementM
         QPID_LOG (debug, "Broker::queueMoveMessages()");
         if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter))
             status = Manageable::STATUS_OK;
-	else
+        else
             return Manageable::STATUS_PARAMETER_INVALID;
         break;
       }
@@ -804,6 +805,7 @@ bool Broker::deferDeliveryImpl(const std
 void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
     clusterTimer = t;
     queueCleaner.setTimer(clusterTimer.get());
+    dtxManager.setTimer(*clusterTimer.get());
 }
 
 const std::string Broker::TCP_TRANSPORT("tcp");
@@ -941,6 +943,9 @@ void Broker::deleteExchange(const std::s
             throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId));
     }
 
+    if (name.empty()) {
+        throw framing::InvalidArgumentException(QPID_MSG("Delete not allowed for default exchange"));
+    }
     Exchange::shared_ptr exchange(exchanges.get(name));
     if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name));
     if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
@@ -968,6 +973,9 @@ void Broker::bind(const std::string& que
         if (!acl->authorise(userId,acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,&params))
             throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << userId));
     }
+    if (exchangeName.empty()) {
+        throw framing::InvalidArgumentException(QPID_MSG("Bind not allowed for default exchange"));
+    }
 
     Queue::shared_ptr queue = queues.find(queueName);
     Exchange::shared_ptr exchange = exchanges.get(exchangeName);
@@ -998,13 +1006,15 @@ void Broker::unbind(const std::string& q
         if (!acl->authorise(userId,acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,&params) )
             throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << userId));
     }
-
+    if (exchangeName.empty()) {
+        throw framing::InvalidArgumentException(QPID_MSG("Unbind not allowed for default exchange"));
+    }
     Queue::shared_ptr queue = queues.find(queueName);
     Exchange::shared_ptr exchange = exchanges.get(exchangeName);
     if (!queue) {
-        throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName));
+        throw framing::NotFoundException(QPID_MSG("Unbind failed. No such queue: " << queueName));
     } else if (!exchange) {
-        throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName));
+        throw framing::NotFoundException(QPID_MSG("Unbind failed. No such exchange: " << exchangeName));
     } else {
         if (exchange->unbind(queue, key, 0)) {
             if (exchange->isDurable() && queue->isDurable()) {

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Connection.cpp Mon Sep 19 15:13:18 2011
@@ -156,16 +156,7 @@ Connection::~Connection()
 void Connection::received(framing::AMQFrame& frame) {
     // Received frame on connection so delay timeout
     restartTimeout();
-
-    if (frame.getChannel() == 0 && frame.getMethod()) {
-        adapter.handle(frame);
-    } else {
-        if (adapter.isOpen())
-            getChannel(frame.getChannel()).in(frame);
-        else
-            close(connection::CLOSE_CODE_FRAMING_ERROR, "Connection not yet open, invalid frame received.");
-    }
-
+    adapter.handle(frame);
     if (isLink) //i.e. we are acting as the client to another broker
         recordFromServer(frame);
     else

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Mon Sep 19 15:13:18 2011
@@ -68,8 +68,15 @@ void ConnectionHandler::handle(framing::
     AMQMethodBody* method=frame.getBody()->getMethod();
     Connection::ErrorListener* errorListener = handler->connection.getErrorListener();
     try{
-        if (!invoke(static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler.get()), *method)) {
+        if (method && invoke(
+                static_cast<AMQP_AllOperations::ConnectionHandler&>(*handler), *method)) {
+            // This is a connection control frame, nothing more to do.
+        } else if (isOpen()) {
             handler->connection.getChannel(frame.getChannel()).in(frame);
+        } else {
+            handler->proxy.close(
+                connection::CLOSE_CODE_FRAMING_ERROR,
+                "Connection not yet open, invalid frame received.");
         }
     }catch(ConnectionException& e){
         if (errorListener) errorListener->connectionError(e.what());
@@ -185,7 +192,7 @@ void ConnectionHandler::Handler::secureO
 void ConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/,
     uint16_t framemax, uint16_t heartbeat)
 {
-    connection.setFrameMax(framemax);
+    if (framemax) connection.setFrameMax(framemax);
     connection.setHeartbeatInterval(heartbeat);
 }
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Sep 19 15:13:18 2011
@@ -75,7 +75,7 @@ void DeliveryRecord::deliver(framing::Fr
 {
     id = deliveryId;
     if (msg.payload->getRedelivered()){
-        msg.payload->getProperties<framing::DeliveryProperties>()->setRedelivered(true);
+        msg.payload->setRedelivered();
     }
     msg.payload->adjustTtl();
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DeliveryRecord.h Mon Sep 19 15:13:18 2011
@@ -90,7 +90,7 @@ class DeliveryRecord
 
     bool isAcquired() const { return acquired; }
     bool isComplete() const { return completed; }
-    bool isRedundant() const { return ended && (!windowing || completed); }     // msg no longer needed - can discard
+    bool isRedundant() const { return ended && (!windowing || completed || cancelled); }
     bool isCancelled() const { return cancelled; }
     bool isAccepted() const { return !acceptExpected; }
     bool isEnded() const { return ended; }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DirectExchange.cpp Mon Sep 19 15:13:18 2011
@@ -139,6 +139,9 @@ bool DirectExchange::unbind(Queue::share
             if (mgmtExchange != 0) {
                 mgmtExchange->dec_bindingCount();
             }
+            if (bk.queues.empty()) {
+                bindings.erase(routingKey);
+            }
         } else {
             return false;
         }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxAck.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxAck.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxAck.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxAck.cpp Mon Sep 19 15:13:18 2011
@@ -32,6 +32,10 @@ DtxAck::DtxAck(const qpid::framing::Sequ
                    not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)));
 }
 
+DtxAck::DtxAck(DeliveryRecords& unacked) {
+    pending = unacked;
+}
+
 bool DtxAck::prepare(TransactionContext* ctxt) throw()
 {
     try{

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxAck.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxAck.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxAck.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxAck.h Mon Sep 19 15:13:18 2011
@@ -1,3 +1,6 @@
+#ifndef QPID_BROKER_DTXACK_H
+#define QPID_BROKER_DTXACK_H
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -18,9 +21,6 @@
  * under the License.
  *
  */
-#ifndef _DtxAck_
-#define _DtxAck_
-
 #include <algorithm>
 #include <functional>
 #include <list>
@@ -29,20 +29,21 @@
 #include "qpid/broker/TxOp.h"
 
 namespace qpid {
-    namespace broker {
-        class DtxAck : public TxOp{
-            DeliveryRecords pending;
+namespace broker {
+class DtxAck : public TxOp{
+    DeliveryRecords pending;
 
-        public:
-            DtxAck(const framing::SequenceSet& acked, DeliveryRecords& unacked);
-            virtual bool prepare(TransactionContext* ctxt) throw();
-            virtual void commit() throw();
-            virtual void rollback() throw();
-            virtual ~DtxAck(){}
-            virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
-        };
-    }
-}
+  public:
+    DtxAck(const framing::SequenceSet& acked, DeliveryRecords& unacked);
+    DtxAck(DeliveryRecords& unacked);
+    virtual bool prepare(TransactionContext* ctxt) throw();
+    virtual void commit() throw();
+    virtual void rollback() throw();
+    virtual ~DtxAck(){}
+    virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+    const DeliveryRecords& getPending() const { return pending; }
+};
 
+}} // qpid::broker
 
-#endif
+#endif  /*!QPID_BROKER_DTXACK_H*/

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxBuffer.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxBuffer.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxBuffer.cpp Mon Sep 19 15:13:18 2011
@@ -23,8 +23,11 @@
 using namespace qpid::broker;
 using qpid::sys::Mutex;
 
-DtxBuffer::DtxBuffer(const std::string& _xid) 
-    : xid(_xid), ended(false), suspended(false), failed(false), expired(false) {}
+DtxBuffer::DtxBuffer(
+    const std::string& _xid,
+    bool ended_, bool suspended_, bool failed_, bool expired_)
+    : xid(_xid), ended(ended_), suspended(suspended_), failed(failed_), expired(expired_)
+{}
 
 DtxBuffer::~DtxBuffer() {}
 
@@ -34,7 +37,7 @@ void DtxBuffer::markEnded() 
     ended = true; 
 }
 
-bool DtxBuffer::isEnded() 
+bool DtxBuffer::isEnded() const
 { 
     Mutex::ScopedLock locker(lock); 
     return ended; 
@@ -45,7 +48,7 @@ void DtxBuffer::setSuspended(bool isSusp
     suspended = isSuspended; 
 }
 
-bool DtxBuffer::isSuspended() 
+bool DtxBuffer::isSuspended() const
 { 
     return suspended; 
 }
@@ -58,13 +61,13 @@ void DtxBuffer::fail()
     ended = true;
 }
 
-bool DtxBuffer::isRollbackOnly()
+bool DtxBuffer::isRollbackOnly() const
 {
     Mutex::ScopedLock locker(lock); 
     return failed;
 }
 
-const std::string& DtxBuffer::getXid()
+std::string DtxBuffer::getXid() const
 {
     return xid;
 }
@@ -76,8 +79,13 @@ void DtxBuffer::timedout()
     fail();
 }
 
-bool DtxBuffer::isExpired()
+bool DtxBuffer::isExpired() const
 {
     Mutex::ScopedLock locker(lock); 
     return expired;
 }
+
+bool DtxBuffer::isFailed() const
+{
+    return failed;
+}

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxBuffer.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxBuffer.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxBuffer.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxBuffer.h Mon Sep 19 15:13:18 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,31 +26,34 @@
 #include "qpid/sys/Mutex.h"
 
 namespace qpid {
-    namespace broker {
-        class DtxBuffer : public TxBuffer{
-            sys::Mutex lock;
-            const std::string xid;
-            bool ended;
-            bool suspended;           
-            bool failed;
-            bool expired;
-
-        public:
-            typedef boost::shared_ptr<DtxBuffer> shared_ptr;
-
-            QPID_BROKER_EXTERN DtxBuffer(const std::string& xid = "");
-            QPID_BROKER_EXTERN ~DtxBuffer();
-            QPID_BROKER_EXTERN void markEnded();
-            bool isEnded();
-            void setSuspended(bool suspended);
-            bool isSuspended();
-            void fail();
-            bool isRollbackOnly();
-            void timedout();
-            bool isExpired();
-            const std::string& getXid();
-        };
-    }
+namespace broker {
+class DtxBuffer : public TxBuffer{
+    mutable sys::Mutex lock;
+    const std::string xid;
+    bool ended;
+    bool suspended;
+    bool failed;
+    bool expired;
+
+  public:
+    typedef boost::shared_ptr<DtxBuffer> shared_ptr;
+
+    QPID_BROKER_EXTERN DtxBuffer(
+        const std::string& xid = "",
+        bool ended=false, bool suspended=false, bool failed=false, bool expired=false);
+    QPID_BROKER_EXTERN ~DtxBuffer();
+    QPID_BROKER_EXTERN void markEnded();
+    bool isEnded() const;
+    void setSuspended(bool suspended);
+    bool isSuspended() const;
+    void fail();
+    bool isRollbackOnly() const;
+    void timedout();
+    bool isExpired() const;
+    bool isFailed() const;
+    std::string getXid() const;
+};
+}
 }
 
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxManager.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxManager.cpp Mon Sep 19 15:13:18 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,7 +34,7 @@ using qpid::ptr_map_ptr;
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(t) {}
+DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {}
 
 DtxManager::~DtxManager() {}
 
@@ -53,8 +53,8 @@ void DtxManager::recover(const std::stri
     createWork(xid)->recover(txn, ops);
 }
 
-bool DtxManager::prepare(const std::string& xid) 
-{ 
+bool DtxManager::prepare(const std::string& xid)
+{
     QPID_LOG(debug, "preparing: " << xid);
     try {
         return getWork(xid)->prepare();
@@ -64,8 +64,8 @@ bool DtxManager::prepare(const std::stri
     }
 }
 
-bool DtxManager::commit(const std::string& xid, bool onePhase) 
-{ 
+bool DtxManager::commit(const std::string& xid, bool onePhase)
+{
     QPID_LOG(debug, "committing: " << xid);
     try {
         bool result = getWork(xid)->commit(onePhase);
@@ -77,8 +77,8 @@ bool DtxManager::commit(const std::strin
     }
 }
 
-void DtxManager::rollback(const std::string& xid) 
-{ 
+void DtxManager::rollback(const std::string& xid)
+{
     QPID_LOG(debug, "rolling back: " << xid);
     try {
         getWork(xid)->rollback();
@@ -91,7 +91,7 @@ void DtxManager::rollback(const std::str
 
 DtxWorkRecord* DtxManager::getWork(const std::string& xid)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     WorkMap::iterator i = work.find(xid);
     if (i == work.end()) {
         throw NotFoundException(QPID_MSG("Unrecognised xid " << xid));
@@ -99,9 +99,14 @@ DtxWorkRecord* DtxManager::getWork(const
     return ptr_map_ptr(i);
 }
 
+bool DtxManager::exists(const std::string& xid) {
+    Mutex::ScopedLock locker(lock);
+    return  work.find(xid) != work.end();
+}
+
 void DtxManager::remove(const std::string& xid)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     WorkMap::iterator i = work.find(xid);
     if (i == work.end()) {
         throw NotFoundException(QPID_MSG("Unrecognised xid " << xid));
@@ -110,14 +115,15 @@ void DtxManager::remove(const std::strin
     }
 }
 
-DtxWorkRecord* DtxManager::createWork(std::string xid)
+DtxWorkRecord* DtxManager::createWork(const std::string& xid)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     WorkMap::iterator i = work.find(xid);
     if (i != work.end()) {
         throw NotAllowedException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)"));
     } else {
-      return ptr_map_ptr(work.insert(xid, new DtxWorkRecord(xid, store)).first);
+        std::string ncxid = xid; // Work around const correctness problems in ptr_map.
+        return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first);
     }
 }
 
@@ -131,7 +137,7 @@ void DtxManager::setTimeout(const std::s
     }
     timeout = intrusive_ptr<DtxTimeout>(new DtxTimeout(secs, *this, xid));
     record->setTimeout(timeout);
-    timer.add(timeout);
+    timer->add(timeout);
 }
 
 uint32_t DtxManager::getTimeout(const std::string& xid)
@@ -142,7 +148,7 @@ uint32_t DtxManager::getTimeout(const st
 
 void DtxManager::timedout(const std::string& xid)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     WorkMap::iterator i = work.find(xid);
     if (i == work.end()) {
         QPID_LOG(warning, "Transaction timeout failed: no record for xid");
@@ -153,7 +159,7 @@ void DtxManager::timedout(const std::str
     }
 }
 
-DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) 
+DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid)
     : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxCleanup"), mgr(_mgr), xid(_xid) {}
 
 void DtxManager::DtxCleanup::fire()

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxManager.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxManager.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxManager.h Mon Sep 19 15:13:18 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,8 +26,8 @@
 #include "qpid/broker/DtxWorkRecord.h"
 #include "qpid/broker/TransactionalStore.h"
 #include "qpid/framing/amqp_types.h"
-#include "qpid/sys/Timer.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/ptr_map.h"
 
 namespace qpid {
 namespace broker {
@@ -39,22 +39,21 @@ class DtxManager{
     {
         DtxManager& mgr;
         const std::string& xid;
-        
-        DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid);    
+
+        DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid);
         void fire();
     };
 
     WorkMap work;
     TransactionalStore* store;
     qpid::sys::Mutex lock;
-    qpid::sys::Timer& timer;
+    qpid::sys::Timer* timer;
 
     void remove(const std::string& xid);
-    DtxWorkRecord* getWork(const std::string& xid);
-    DtxWorkRecord* createWork(std::string xid);
+    DtxWorkRecord* createWork(const std::string& xid);
 
 public:
-    DtxManager(qpid::sys::Timer&);
+    DtxManager(sys::Timer&);
     ~DtxManager();
     void start(const std::string& xid, DtxBuffer::shared_ptr work);
     void join(const std::string& xid, DtxBuffer::shared_ptr work);
@@ -66,6 +65,15 @@ public:
     uint32_t getTimeout(const std::string& xid);
     void timedout(const std::string& xid);
     void setStore(TransactionalStore* store);
+    void setTimer(sys::Timer& t) { timer = &t; }
+
+    // Used by cluster for replication.
+    template<class F> void each(F f) const {
+        for (WorkMap::const_iterator i = work.begin(); i != work.end(); ++i)
+            f(*ptr_map_ptr(i));
+    }
+    DtxWorkRecord* getWork(const std::string& xid);
+    bool exists(const std::string& xid);
 };
 
 }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxTimeout.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxTimeout.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxTimeout.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxTimeout.cpp Mon Sep 19 15:13:18 2011
@@ -25,7 +25,7 @@
 using namespace qpid::broker;
 
 DtxTimeout::DtxTimeout(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) 
-    : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxTimeout"), timeout(_timeout), mgr(_mgr), xid(_xid)
+    : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxTimeout-"+_xid), timeout(_timeout), mgr(_mgr), xid(_xid)
 {
 }
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxTimeout.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxTimeout.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxTimeout.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxTimeout.h Mon Sep 19 15:13:18 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,7 +29,9 @@ namespace broker {
 
 class DtxManager;
 
-struct DtxTimeoutException : public Exception {};
+struct DtxTimeoutException : public Exception {
+    DtxTimeoutException(const std::string& msg=std::string()) : Exception(msg) {}
+};
 
 struct DtxTimeout : public sys::TimerTask
 {
@@ -37,7 +39,7 @@ struct DtxTimeout : public sys::TimerTas
     DtxManager& mgr;
     const std::string xid;
 
-    DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);    
+    DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);
     void fire();
 };
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp Mon Sep 19 15:13:18 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,19 +28,19 @@ using qpid::sys::Mutex;
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : 
+DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) :
     xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {}
 
-DtxWorkRecord::~DtxWorkRecord() 
+DtxWorkRecord::~DtxWorkRecord()
 {
-    if (timeout.get()) {  
+    if (timeout.get()) {
         timeout->cancel();
     }
 }
 
 bool DtxWorkRecord::prepare()
 {
-    Mutex::ScopedLock locker(lock);     
+    Mutex::ScopedLock locker(lock);
     if (check()) {
         txn = store->begin(xid);
         if (prepare(txn.get())) {
@@ -68,7 +68,7 @@ bool DtxWorkRecord::prepare(TransactionC
 
 bool DtxWorkRecord::commit(bool onePhase)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     if (check()) {
         if (prepared) {
             //already prepared i.e. 2pc
@@ -78,13 +78,13 @@ bool DtxWorkRecord::commit(bool onePhase
 
             store->commit(*txn);
             txn.reset();
-            
+
             std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
             return true;
         } else {
             //1pc commit optimisation, don't need a 2pc transaction context:
             if (!onePhase) {
-                throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));        
+                throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));
             }
             std::auto_ptr<TransactionContext> localtxn = store->begin();
             if (prepare(localtxn.get())) {
@@ -107,16 +107,16 @@ bool DtxWorkRecord::commit(bool onePhase
 
 void DtxWorkRecord::rollback()
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     check();
     abort();
 }
 
 void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     if (expired) {
-        throw DtxTimeoutException();
+        throw DtxTimeoutException(QPID_MSG("Branch with xid " << xid << " has timed out."));
     }
     if (completed) {
         throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!"));
@@ -163,7 +163,7 @@ void DtxWorkRecord::recover(std::auto_pt
 
 void DtxWorkRecord::timedout()
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     expired = true;
     rolledback = true;
     if (!completed) {
@@ -175,3 +175,17 @@ void DtxWorkRecord::timedout()
     }
     abort();
 }
+
+size_t DtxWorkRecord::indexOf(const DtxBuffer::shared_ptr& buf) {
+    Work::iterator i = std::find(work.begin(), work.end(), buf);
+    if (i == work.end()) throw NotFoundException(
+        QPID_MSG("Can't find DTX buffer for xid: " << buf->getXid()));
+    return i - work.begin();
+}
+
+DtxBuffer::shared_ptr DtxWorkRecord::operator[](size_t i) const {
+    if (i > work.size())
+        throw NotFoundException(
+            QPID_MSG("Can't find DTX buffer " << i << " for xid: " << xid));
+    return work[i];
+}

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxWorkRecord.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxWorkRecord.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/DtxWorkRecord.h Mon Sep 19 15:13:18 2011
@@ -73,9 +73,19 @@ public:
     void timedout();
     void setTimeout(boost::intrusive_ptr<DtxTimeout> t) { timeout = t; }
     boost::intrusive_ptr<DtxTimeout> getTimeout() { return timeout; }
+    std::string getXid() const { return xid; }
+    bool isCompleted() const { return completed; }
+    bool isRolledback() const { return rolledback; }
+    bool isPrepared() const { return prepared; }
+    bool isExpired() const { return expired; }
+
+    // Used by cluster update;
+    size_t size() const { return work.size(); }
+    DtxBuffer::shared_ptr operator[](size_t i) const;
+    uint32_t getTimeout() const { return timeout? timeout->timeout : 0; }
+    size_t indexOf(const DtxBuffer::shared_ptr&);
 };
 
-}
-}
+}} // qpid::broker
 
 #endif

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Sep 19 15:13:18 2011
@@ -58,7 +58,7 @@ Exchange::PreRoute::PreRoute(Deliverable
 
         if (parent->sequence){
             parent->sequenceNo++;
-            msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
+            msg.getMessage().insertCustomProperty(qpidMsgSequence,parent->sequenceNo);
         }
         if (parent->ive) {
             parent->lastMsg =  &( msg.getMessage());
@@ -390,7 +390,7 @@ bool Exchange::MatchQueue::operator()(Ex
 }
 
 void Exchange::setProperties(const boost::intrusive_ptr<Message>& msg) {
-    msg->getProperties<DeliveryProperties>()->setExchange(getName());
+    msg->setExchange(getName());
 }
 
 bool Exchange::routeWithAlternate(Deliverable& msg)

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/LegacyLVQ.cpp Mon Sep 19 15:13:18 2011
@@ -93,11 +93,7 @@ void LegacyLVQ::removeIf(Predicate p)
     //purging of an LVQ is not enabled if the broker is clustered
     //(expired messages will be removed on delivery and consolidated
     //by key as part of normal LVQ operation).
-
-    //TODO: Is there a neater way to check whether broker is
-    //clustered? Here we assume that if the clustered timer is the
-    //same as the regular timer, we are not clustered:
-    if (!broker || &(broker->getClusterTimer()) == &(broker->getTimer()))
+    if (!broker || !broker->isInCluster())
         MessageMap::removeIf(p);
 }
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Link.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Link.cpp Mon Sep 19 15:13:18 2011
@@ -248,6 +248,19 @@ void Link::ioThreadProcessing()
         return;
     QPID_LOG(debug, "Link::ioThreadProcessing()");
 
+    // check for bridge session errors and recover
+    if (!active.empty()) {
+        Bridges::iterator removed = std::remove_if(
+            active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1));
+        for (Bridges::iterator i = removed; i != active.end(); ++i) {
+            Bridge::shared_ptr  bridge = *i;
+            bridge->closed();
+            bridge->cancel(*connection);
+            created.push_back(bridge);
+        }
+        active.erase(removed, active.end());
+    }
+
     //process any pending creates and/or cancellations
     if (!created.empty()) {
         for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
@@ -296,7 +309,7 @@ void Link::maintenanceVisit ()
             }
         }
     }
-    else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0)
+    else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
         connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
 }
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Message.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Message.cpp Mon Sep 19 15:13:18 2011
@@ -30,6 +30,7 @@
 #include "qpid/framing/SendContent.h"
 #include "qpid/framing/SequenceNumber.h"
 #include "qpid/framing/TypeFilter.h"
+#include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
 
 #include <time.h>
@@ -51,18 +52,9 @@ Message::Message(const framing::Sequence
     frames(id), persistenceId(0), redelivered(false), loaded(false),
     staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
     expiration(FAR_FUTURE), dequeueCallback(0),
-    inCallback(false), requiredCredit(0), isManagementMessage(false)
+    inCallback(false), requiredCredit(0), isManagementMessage(false), copyHeaderOnWrite(false)
 {}
 
-Message::Message(const Message& original) :
-    PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false),
-    staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
-    expiration(original.expiration), dequeueCallback(0),
-    inCallback(false), requiredCredit(0)
-{
-    setExpiryPolicy(original.expiryPolicy);
-}
-
 Message::~Message() {}
 
 void Message::forcePersistent()
@@ -288,6 +280,9 @@ void Message::sendHeader(framing::FrameH
     sys::Mutex::ScopedLock l(lock);
     Relay f(out);
     frames.map_if(f, TypeFilter<HEADER_BODY>());
+    //as frame (and pointer to body) has now been passed to handler,
+    //subsequent modifications should use a copy
+    copyHeaderOnWrite = true;
 }
 
 // TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
@@ -342,11 +337,30 @@ bool Message::isExcluded(const std::vect
     return false;
 }
 
+class CloneHeaderBody
+{
+public:
+    void operator()(AMQFrame& f)
+    {
+        f.cloneBody();
+    }
+};
+
+AMQHeaderBody* Message::getHeaderBody()
+{
+    if (copyHeaderOnWrite) {
+        CloneHeaderBody f;
+        frames.map_if(f, TypeFilter<HEADER_BODY>());
+        copyHeaderOnWrite = false;
+    }
+    return frames.getHeaders();
+}
+
 void Message::addTraceId(const std::string& id)
 {
     sys::Mutex::ScopedLock l(lock);
     if (isA<MessageTransferBody>()) {
-        FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders();
+        FieldTable& headers = getModifiableProperties<MessageProperties>()->getApplicationHeaders();
         std::string trace = headers.getAsString(X_QPID_TRACE);
         if (trace.empty()) {
             headers.setString(X_QPID_TRACE, id);
@@ -360,7 +374,8 @@ void Message::addTraceId(const std::stri
 
 void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
 {
-    DeliveryProperties* props = getProperties<DeliveryProperties>();
+    sys::Mutex::ScopedLock l(lock);
+    DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
     if (props->getTtl()) {
         // AMQP requires setting the expiration property to be posix
         // time_t in seconds. TTL is in milliseconds
@@ -382,9 +397,9 @@ void Message::setTimestamp(const boost::
 
 void Message::adjustTtl()
 {
-    DeliveryProperties* props = getProperties<DeliveryProperties>();
+    sys::Mutex::ScopedLock l(lock);
+    DeliveryProperties* props = getModifiableProperties<DeliveryProperties>();
     if (props->getTtl()) {
-        sys::Mutex::ScopedLock l(lock);
         if (expiration < FAR_FUTURE) {
             sys::AbsTime current(
                 expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
@@ -395,6 +410,42 @@ void Message::adjustTtl()
     }
 }
 
+void Message::setRedelivered()
+{
+    sys::Mutex::ScopedLock l(lock);
+    getModifiableProperties<framing::DeliveryProperties>()->setRedelivered(true);
+}
+
+void Message::insertCustomProperty(const std::string& key, int64_t value)
+{
+    sys::Mutex::ScopedLock l(lock);
+    getModifiableProperties<MessageProperties>()->getApplicationHeaders().setInt64(key,value);
+}
+
+void Message::insertCustomProperty(const std::string& key, const std::string& value)
+{
+    sys::Mutex::ScopedLock l(lock);
+    getModifiableProperties<MessageProperties>()->getApplicationHeaders().setString(key,value);
+}
+
+void Message::removeCustomProperty(const std::string& key)
+{
+    sys::Mutex::ScopedLock l(lock);
+    getModifiableProperties<MessageProperties>()->getApplicationHeaders().erase(key);
+}
+
+void Message::setExchange(const std::string& exchange)
+{
+    sys::Mutex::ScopedLock l(lock);
+    getModifiableProperties<DeliveryProperties>()->setExchange(exchange);
+}
+
+void Message::clearApplicationHeadersFlag()
+{
+    sys::Mutex::ScopedLock l(lock);
+    getModifiableProperties<MessageProperties>()->clearApplicationHeadersFlag();
+}
+
 void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
     expiryPolicy = e;
 }
@@ -442,11 +493,6 @@ uint8_t Message::getPriority() const {
     return getAdapter().getPriority(frames);
 }
 
-framing::FieldTable& Message::getOrInsertHeaders()
-{
-    return getProperties<MessageProperties>()->getApplicationHeaders();
-}
-
 bool Message::getIsManagementMessage() const { return isManagementMessage; }
 void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Message.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Message.h Mon Sep 19 15:13:18 2011
@@ -29,13 +29,17 @@
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
 #include <boost/function.hpp>
+#include <boost/intrusive_ptr.hpp>
 #include <boost/shared_ptr.hpp>
+#include <memory>
 #include <string>
 #include <vector>
 
 namespace qpid {
 
 namespace framing {
+class AMQBody;
+class AMQHeaderBody;
 class FieldTable;
 class SequenceNumber;
 }
@@ -53,7 +57,6 @@ public:
     typedef boost::function<void (const boost::intrusive_ptr<Message>&)> MessageCallback;
 
     QPID_BROKER_EXTERN Message(const framing::SequenceNumber& id = framing::SequenceNumber());
-    QPID_BROKER_EXTERN Message(const Message&);
     QPID_BROKER_EXTERN ~Message();
 
     uint64_t getPersistenceId() const { return persistenceId; }
@@ -75,7 +78,6 @@ public:
     bool isImmediate() const;
     QPID_BROKER_EXTERN const framing::FieldTable* getApplicationHeaders() const;
     QPID_BROKER_EXTERN std::string getAppId() const;
-    framing::FieldTable& getOrInsertHeaders();
     QPID_BROKER_EXTERN bool isPersistent() const;
     bool requiresAccept();
 
@@ -85,18 +87,19 @@ public:
     sys::AbsTime getExpiration() const { return expiration; }
     void setExpiration(sys::AbsTime exp) { expiration = exp; }
     void adjustTtl();
+    void setRedelivered();
+    QPID_BROKER_EXTERN void insertCustomProperty(const std::string& key, int64_t value);
+    QPID_BROKER_EXTERN void insertCustomProperty(const std::string& key, const std::string& value);
+    QPID_BROKER_EXTERN void removeCustomProperty(const std::string& key);
+    void setExchange(const std::string&);
+    void clearApplicationHeadersFlag();
 
     framing::FrameSet& getFrames() { return frames; }
     const framing::FrameSet& getFrames() const { return frames; }
 
-    template <class T> T* getProperties() {
-        qpid::framing::AMQHeaderBody* p = frames.getHeaders();
-        return p->get<T>(true);
-    }
-
     template <class T> const T* getProperties() const {
         const qpid::framing::AMQHeaderBody* p = frames.getHeaders();
-        return p->get<T>(true);
+        return p->get<T>();
     }
 
     template <class T> const T* hasProperties() const {
@@ -156,9 +159,8 @@ public:
     bool isExcluded(const std::vector<std::string>& excludes) const;
     void addTraceId(const std::string& id);
 
-       void forcePersistent();
-       bool isForcedPersistent();
-
+    void forcePersistent();
+    bool isForcedPersistent();
 
     /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */
     void setDequeueCompleteCallback(MessageCallback& cb);
@@ -178,7 +180,7 @@ public:
     bool redelivered;
     bool loaded;
     bool staged;
-	bool forcePersistentPolicy; // used to force message as durable, via a broker policy
+    bool forcePersistentPolicy; // used to force message as durable, via a broker policy
     ConnectionToken* publisher;
     mutable MessageAdapter* adapter;
     qpid::sys::AbsTime expiration;
@@ -194,6 +196,15 @@ public:
 
     uint32_t requiredCredit;
     bool isManagementMessage;
+      mutable bool copyHeaderOnWrite;
+
+    /**
+     * Expects lock to be held
+     */
+    template <class T> T* getModifiableProperties() {
+        return getHeaderBody()->get<T>(true);
+    }
+    qpid::framing::AMQHeaderBody* getHeaderBody();
 };
 
 }}

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/Queue.cpp Mon Sep 19 15:13:18 2011
@@ -702,7 +702,7 @@ void Queue::push(boost::intrusive_ptr<Me
     {
         Mutex::ScopedLock locker(messageLock);
         QueuedMessage qm(this, msg, ++sequence);
-        if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
+        if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
 
         dequeueRequired = messages->push(qm, removed);
         listeners.populate(copy);
@@ -805,11 +805,6 @@ bool Queue::enqueue(TransactionContext* 
     }
 
     if (traceId.size()) {
-        //copy on write: take deep copy of message before modifying it
-        //as the frames may already be available for delivery on other
-        //threads
-        boost::intrusive_ptr<Message> copy(new Message(*msg));
-        msg = copy;
         msg->addTraceId(traceId);
     }
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp Mon Sep 19 15:13:18 2011
@@ -381,13 +381,17 @@ void CyrusAuthenticator::start(const str
     const char *challenge;
     unsigned int challenge_len;
     
-    QPID_LOG(debug, "SASL: Starting authentication with mechanism: " << mechanism);
+    // This should be at same debug level as mech list in getMechanisms().
+    QPID_LOG(info, "SASL: Starting authentication with mechanism: " << mechanism);
     int code = sasl_server_start(sasl_conn,
                                  mechanism.c_str(),
-                                 response.c_str(), response.length(),
+                                 response.size() ? response.c_str() : 0, response.length(),
                                  &challenge, &challenge_len);
     
     processAuthenticationStep(code, challenge, challenge_len);
+    qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
+    if ( cnxMgmt ) 
+        cnxMgmt->set_saslMechanism(mechanism);
 }
         
 void CyrusAuthenticator::step(const string& response)
@@ -424,10 +428,12 @@ void CyrusAuthenticator::processAuthenti
         client.secure(challenge_str);
     } else {
         std::string uid;
+        //save error detail before trying to retrieve username as error in doing so will overwrite it
+        std::string errordetail = sasl_errdetail(sasl_conn);
         if (!getUsername(uid)) {
-            QPID_LOG(info, "SASL: Authentication failed (no username available):" << sasl_errdetail(sasl_conn));
+            QPID_LOG(info, "SASL: Authentication failed (no username available yet):" << errordetail);
         } else {
-            QPID_LOG(info, "SASL: Authentication failed for " << uid << ":" << sasl_errdetail(sasl_conn));
+            QPID_LOG(info, "SASL: Authentication failed for " << uid << ":" << errordetail);
         }
 
         // TODO: Change to more specific exceptions, when they are
@@ -459,6 +465,9 @@ std::auto_ptr<SecurityLayer> CyrusAuthen
     if (ssf) {
         securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize));
     }
+    qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
+    if ( cnxMgmt ) 
+        cnxMgmt->set_saslSsf(ssf);
     return securityLayer;
 }
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Sep 19 15:13:18 2011
@@ -132,6 +132,10 @@ bool SemanticState::cancel(const string&
         //should cancel all unacked messages for this consumer so that
         //they are not redelivered on recovery
         for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag));
+        //can also remove any records that are now redundant
+        DeliveryRecords::iterator removed =
+            remove_if(unacked.begin(), unacked.end(), bind(&DeliveryRecord::isRedundant, _1));
+        unacked.erase(removed, unacked.end());
         return true;
     } else {
         return false;
@@ -177,8 +181,8 @@ void SemanticState::startDtx(const std::
     if (!dtxSelected) {
         throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx"));
     }
-    dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
-    txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer);
+    dtxBuffer.reset(new DtxBuffer(xid));
+    txBuffer = dtxBuffer;
     if (join) {
         mgr.join(xid, dtxBuffer);
     } else {
@@ -246,7 +250,7 @@ void SemanticState::resumeDtx(const std:
 
     checkDtxTimeout();
     dtxBuffer->setSuspended(false);
-    txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer);
+    txBuffer = dtxBuffer;
 }
 
 void SemanticState::checkDtxTimeout()
@@ -284,6 +288,7 @@ SemanticState::ConsumerImpl::ConsumerImp
     acquire(_acquire),
     blocked(true),
     windowing(true),
+    windowActive(false),
     exclusive(_exclusive),
     resumeId(_resumeId),
     tag(_tag),
@@ -534,7 +539,7 @@ void SemanticState::ConsumerImpl::comple
 {
     if (!delivery.isComplete()) {
         delivery.complete();
-        if (windowing) {
+        if (windowing && windowActive) {
             if (msgCredit != 0xFFFFFFFF) msgCredit++;
             if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit();
         }
@@ -641,6 +646,7 @@ void SemanticState::ConsumerImpl::setCre
 void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
 {
     assertClusterSafe();
+    if (windowing) windowActive = true;
     if (byteCredit != 0xFFFFFFFF) {
         if (value == 0xFFFFFFFF) byteCredit = value;
         else byteCredit += value;
@@ -650,6 +656,7 @@ void SemanticState::ConsumerImpl::addByt
 void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
 {
     assertClusterSafe();
+    if (windowing) windowActive = true;
     if (msgCredit != 0xFFFFFFFF) {
         if (value == 0xFFFFFFFF) msgCredit = value;
         else msgCredit += value;
@@ -670,7 +677,8 @@ void SemanticState::ConsumerImpl::flush(
 {
     while(haveCredit() && queue->dispatch(shared_from_this()))
         ;
-    stop();
+    msgCredit = 0;
+    byteCredit = 0;
 }
 
 void SemanticState::ConsumerImpl::stop()
@@ -678,6 +686,7 @@ void SemanticState::ConsumerImpl::stop()
     assertClusterSafe();
     msgCredit = 0;
     byteCredit = 0;
+    windowActive = false;
 }
 
 Queue::shared_ptr SemanticState::getQueue(const string& name) const {
@@ -711,6 +720,10 @@ void SemanticState::release(DeliveryId f
     DeliveryRecords::reverse_iterator start(range.end);
     DeliveryRecords::reverse_iterator end(range.start);
     for_each(start, end, boost::bind(&DeliveryRecord::release, _1, setRedelivered));
+
+    DeliveryRecords::iterator removed =
+        remove_if(range.start, range.end, bind(&DeliveryRecord::isRedundant, _1));
+    unacked.erase(removed, range.end);
 }
 
 void SemanticState::reject(DeliveryId first, DeliveryId last)

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SemanticState.h Mon Sep 19 15:13:18 2011
@@ -65,7 +65,7 @@ class SessionContext;
  *
  * Message delivery is driven by ConsumerImpl::doOutput(), which is
  * called when a client's socket is ready to write data.
- * 
+ *
  */
 class SemanticState : private boost::noncopyable {
   public:
@@ -80,6 +80,7 @@ class SemanticState : private boost::non
         const bool acquire;
         bool blocked;
         bool windowing;
+        bool windowActive;
         bool exclusive;
         std::string resumeId;
         const std::string tag;  // <destination> from AMQP 0-10 Message.subscribe command
@@ -106,9 +107,9 @@ class SemanticState : private boost::non
                      uint64_t resumeTtl, const framing::FieldTable& arguments);
         ~ConsumerImpl();
         OwnershipToken* getSession();
-        bool deliver(QueuedMessage& msg);            
-        bool filter(boost::intrusive_ptr<Message> msg);            
-        bool accept(boost::intrusive_ptr<Message> msg);            
+        bool deliver(QueuedMessage& msg);
+        bool filter(boost::intrusive_ptr<Message> msg);
+        bool accept(boost::intrusive_ptr<Message> msg);
 
         void disableNotify();
         void enableNotify();
@@ -123,7 +124,7 @@ class SemanticState : private boost::non
         void addMessageCredit(uint32_t value);
         void flush();
         void stop();
-        void complete(DeliveryRecord&);    
+        void complete(DeliveryRecord&);
         boost::shared_ptr<Queue> getQueue() const { return queue; }
         bool isBlocked() const { return blocked; }
         bool setBlocked(bool set) { std::swap(set, blocked); return set; }
@@ -148,9 +149,10 @@ class SemanticState : private boost::non
         management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
     };
 
+    typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
+
   private:
     typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
-    typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
 
     SessionContext& session;
     DeliveryAdapter& deliveryAdapter;
@@ -181,6 +183,7 @@ class SemanticState : private boost::non
     void disable(ConsumerImpl::shared_ptr);
 
   public:
+
     SemanticState(DeliveryAdapter&, SessionContext&);
     ~SemanticState();
 
@@ -189,7 +192,7 @@ class SemanticState : private boost::non
 
     const ConsumerImpl::shared_ptr find(const std::string& destination) const;
     bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const;
-    
+
     /**
      * Get named queue, never returns 0.
      * @return: named queue
@@ -197,11 +200,11 @@ class SemanticState : private boost::non
      * @exception: ConnectionException if name="" and session has no default.
      */
     boost::shared_ptr<Queue> getQueue(const std::string& name) const;
-    
+
     bool exists(const std::string& consumerTag);
 
-    void consume(const std::string& destination, 
-                 boost::shared_ptr<Queue> queue, 
+    void consume(const std::string& destination,
+                 boost::shared_ptr<Queue> queue,
                  bool ackRequired, bool acquire, bool exclusive,
                  const std::string& resumeId=std::string(), uint64_t resumeTtl=0,
                  const framing::FieldTable& = framing::FieldTable());
@@ -219,12 +222,13 @@ class SemanticState : private boost::non
     void commit(MessageStore* const store);
     void rollback();
     void selectDtx();
+    bool getDtxSelected() const { return dtxSelected; }
     void startDtx(const std::string& xid, DtxManager& mgr, bool join);
     void endDtx(const std::string& xid, bool fail);
     void suspendDtx(const std::string& xid);
     void resumeDtx(const std::string& xid);
     void recover(bool requeue);
-    void deliver(DeliveryRecord& message, bool sync);            
+    void deliver(DeliveryRecord& message, bool sync);
     void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
     void release(DeliveryId first, DeliveryId last, bool setRedelivered);
     void reject(DeliveryId first, DeliveryId last);
@@ -245,9 +249,12 @@ class SemanticState : private boost::non
     DeliveryRecords& getUnacked() { return unacked; }
     framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
     TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }
+    DtxBuffer::shared_ptr getDtxBuffer() const { return dtxBuffer; }
     void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; }
+    void setDtxBuffer(const DtxBuffer::shared_ptr& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; }
     void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
     void record(const DeliveryRecord& delivery);
+    DtxBufferMap& getSuspendedXids() { return suspendedXids; }
 };
 
 }} // namespace qpid::broker



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org