You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/20 20:43:26 UTC

svn commit: r1186990 [6/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2...

Modified: qpid/branches/QPID-2519/cpp/src/qmf/ConsoleSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qmf/ConsoleSession.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qmf/ConsoleSession.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qmf/ConsoleSession.cpp Thu Oct 20 18:42:46 2011
@@ -54,6 +54,7 @@ void ConsoleSession::setAgentFilter(cons
 void ConsoleSession::open() { impl->open(); }
 void ConsoleSession::close() { impl->close(); }
 bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextEvent(e, t); }
+int ConsoleSession::pendingEvents() const { return impl->pendingEvents(); }
 uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); }
 Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); }
 Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); }
@@ -65,9 +66,9 @@ Subscription ConsoleSession::subscribe(c
 //========================================================================================
 
 ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
-    connection(c), domain("default"), maxAgentAgeMinutes(5),
-    opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
-    connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
+    connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
+    opened(false), eventNotifier(0), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
+    connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1)
 {
     if (!options.empty()) {
         qpid::messaging::AddressParser parser(options);
@@ -91,7 +92,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(C
         iter = optMap.find("strict-security");
         if (iter != optMap.end())
             strictSecurity = iter->second.asBool();
+
+        iter = optMap.find("max-thread-wait-time");
+        if (iter != optMap.end())
+            maxThreadWaitTime = iter->second.asUint32();
     }
+
+    if (maxThreadWaitTime > 60)
+        maxThreadWaitTime = 60;
 }
 
 
@@ -99,6 +107,11 @@ ConsoleSessionImpl::~ConsoleSessionImpl(
 {
     if (opened)
         close();
+
+    if (thread) {
+        thread->join();
+        delete thread;
+    }
 }
 
 
@@ -153,6 +166,12 @@ void ConsoleSessionImpl::open()
     if (opened)
         throw QmfException("The session is already open");
 
+    // If the thread exists, join and delete it before creating a new one.
+    if (thread) {
+        thread->join();
+        delete thread;
+    }
+
     // Establish messaging addresses
     directBase = "qmf." + domain + ".direct";
     topicBase = "qmf." + domain + ".topic";
@@ -181,45 +200,57 @@ void ConsoleSessionImpl::open()
 
     // Start the receiver thread
     threadCanceled = false;
+    opened = true;
     thread = new qpid::sys::Thread(*this);
 
     // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent.
     sendBrokerLocate();
     if (agentQuery)
         sendAgentLocate();
-
-    opened = true;
 }
 
 
-void ConsoleSessionImpl::close()
+void ConsoleSessionImpl::closeAsync()
 {
     if (!opened)
         throw QmfException("The session is already closed");
 
-    // Stop and join the receiver thread
+    // Stop the receiver thread.  Don't join it until the destructor is called or open() is called.
     threadCanceled = true;
-    thread->join();
-    delete thread;
-
-    // Close the AMQP session
-    session.close();
     opened = false;
 }
 
 
+void ConsoleSessionImpl::close()
+{
+    closeAsync();
+
+    if (thread) {
+        thread->join();
+        delete thread;
+        thread = 0;
+    }
+}
+
+
 bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout)
 {
     uint64_t milliseconds = timeout.getMilliseconds();
     qpid::sys::Mutex::ScopedLock l(lock);
 
-    if (eventQueue.empty())
-        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
-                                           qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
+    if (eventQueue.empty() && milliseconds > 0) {
+        int64_t nsecs(qpid::sys::TIME_INFINITE);
+        if ((uint64_t)(nsecs / 1000000) > milliseconds)
+            nsecs = (int64_t) milliseconds * 1000000;
+        qpid::sys::Duration then(nsecs);
+        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
+    }
 
     if (!eventQueue.empty()) {
         event = eventQueue.front();
         eventQueue.pop();
+        if (eventQueue.empty())
+            alertEventNotifierLH(false);
         return true;
     }
 
@@ -227,6 +258,27 @@ bool ConsoleSessionImpl::nextEvent(Conso
 }
 
 
+int ConsoleSessionImpl::pendingEvents() const
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    return eventQueue.size();
+}
+
+
+void ConsoleSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    eventNotifier = notifier;
+}
+
+
+EventNotifierImpl* ConsoleSessionImpl::getEventNotifier() const
+{
+    qpid::sys::Mutex::ScopedLock l(lock);
+    return eventNotifier;
+}
+
+
 uint32_t ConsoleSessionImpl::getAgentCount() const
 {
     qpid::sys::Mutex::ScopedLock l(lock);
@@ -268,8 +320,10 @@ void ConsoleSessionImpl::enqueueEventLH(
 {
     bool notify = eventQueue.empty();
     eventQueue.push(event);
-    if (notify)
+    if (notify) {
         cond.notify();
+        alertEventNotifierLH(true);
+    }
 }
 
 
@@ -421,7 +475,23 @@ void ConsoleSessionImpl::handleAgentUpda
     iter = content.find("_values");
     if (iter == content.end())
         return;
-    Variant::Map attrs(iter->second.asMap());
+    const Variant::Map& in_attrs(iter->second.asMap());
+    Variant::Map attrs;
+
+    //
+    // Copy the map from the message to "attrs".  Translate any old-style
+    // keys to their new key values in the process.
+    //
+    for (iter = in_attrs.begin(); iter != in_attrs.end(); iter++) {
+        if      (iter->first == "epoch")
+            attrs[protocol::AGENT_ATTR_EPOCH] = iter->second;
+        else if (iter->first == "timestamp")
+            attrs[protocol::AGENT_ATTR_TIMESTAMP] = iter->second;
+        else if (iter->first == "heartbeat_interval")
+            attrs[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = iter->second;
+        else
+            attrs[iter->first] = iter->second;
+    }
 
     iter = attrs.find(protocol::AGENT_ATTR_EPOCH);
     if (iter != attrs.end())
@@ -562,6 +632,13 @@ void ConsoleSessionImpl::periodicProcess
 }
 
 
+void ConsoleSessionImpl::alertEventNotifierLH(bool readable)
+{
+    if (eventNotifier)
+        eventNotifier->setReadable(readable);
+}
+
+
 void ConsoleSessionImpl::run()
 {
     QPID_LOG(debug, "ConsoleSession thread started");
@@ -572,7 +649,7 @@ void ConsoleSessionImpl::run()
                                qpid::sys::TIME_SEC);
 
             Receiver rx;
-            bool valid = session.nextReceiver(rx, Duration::SECOND);
+            bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
             if (threadCanceled)
                 break;
             if (valid) {
@@ -589,6 +666,18 @@ void ConsoleSessionImpl::run()
         enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED)));
     }
 
+    session.close();
     QPID_LOG(debug, "ConsoleSession thread exiting");
 }
 
+
+ConsoleSessionImpl& ConsoleSessionImplAccess::get(ConsoleSession& session)
+{
+  return *session.impl;
+}
+
+
+const ConsoleSessionImpl& ConsoleSessionImplAccess::get(const ConsoleSession& session)
+{
+  return *session.impl;
+}

Modified: qpid/branches/QPID-2519/cpp/src/qmf/ConsoleSessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qmf/ConsoleSessionImpl.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qmf/ConsoleSessionImpl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qmf/ConsoleSessionImpl.h Thu Oct 20 18:42:46 2011
@@ -27,6 +27,7 @@
 #include "qmf/SchemaId.h"
 #include "qmf/Schema.h"
 #include "qmf/ConsoleEventImpl.h"
+#include "qmf/EventNotifierImpl.h"
 #include "qmf/SchemaCache.h"
 #include "qmf/Query.h"
 #include "qpid/sys/Mutex.h"
@@ -41,9 +42,13 @@
 #include "qpid/messaging/Address.h"
 #include "qpid/management/Buffer.h"
 #include "qpid/types/Variant.h"
+
+#include <boost/shared_ptr.hpp>
 #include <map>
 #include <queue>
 
+using namespace std;
+
 namespace qmf {
     class ConsoleSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
     public:
@@ -56,8 +61,14 @@ namespace qmf {
         void setDomain(const std::string& d) { domain = d; }
         void setAgentFilter(const std::string& f);
         void open();
+        void closeAsync();
         void close();
         bool nextEvent(ConsoleEvent& e, qpid::messaging::Duration t);
+        int pendingEvents() const;
+
+        void setEventNotifier(EventNotifierImpl* notifier);
+        EventNotifierImpl* getEventNotifier() const;
+
         uint32_t getAgentCount() const;
         Agent getAgent(uint32_t i) const;
         Agent getConnectedBrokerAgent() const { return connectedBrokerAgent; }
@@ -75,9 +86,11 @@ namespace qmf {
         uint32_t maxAgentAgeMinutes;
         bool listenOnDirect;
         bool strictSecurity;
+        uint32_t maxThreadWaitTime;
         Query agentQuery;
         bool opened;
         std::queue<ConsoleEvent> eventQueue;
+        EventNotifierImpl* eventNotifier;
         qpid::sys::Thread* thread;
         bool threadCanceled;
         uint64_t lastVisit;
@@ -89,6 +102,8 @@ namespace qmf {
         std::string directBase;
         std::string topicBase;
         boost::shared_ptr<SchemaCache> schemaCache;
+        qpid::sys::Mutex corrlock;
+        uint32_t nextCorrelator;
 
         void enqueueEvent(const ConsoleEvent&);
         void enqueueEventLH(const ConsoleEvent&);
@@ -98,10 +113,17 @@ namespace qmf {
         void handleAgentUpdate(const std::string&, const qpid::types::Variant::Map&, const qpid::messaging::Message&);
         void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&);
         void periodicProcessing(uint64_t);
+        void alertEventNotifierLH(bool readable);
         void run();
+        uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; }
 
         friend class AgentImpl;
     };
+
+    struct ConsoleSessionImplAccess {
+        static ConsoleSessionImpl& get(ConsoleSession& session);
+        static const ConsoleSessionImpl& get(const ConsoleSession& session);
+    };
 }
 
 #endif

Modified: qpid/branches/QPID-2519/cpp/src/qmf/DataAddr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qmf/DataAddr.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qmf/DataAddr.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qmf/DataAddr.cpp Thu Oct 20 18:42:46 2011
@@ -36,7 +36,9 @@ DataAddr::~DataAddr() { PI::dtor(*this);
 DataAddr& DataAddr::operator=(const DataAddr& s) { return PI::assign(*this, s); }
 
 bool DataAddr::operator==(const DataAddr& o) { return *impl == *o.impl; }
+bool DataAddr::operator==(const DataAddr& o) const { return *impl == *o.impl; }
 bool DataAddr::operator<(const DataAddr& o) { return *impl < *o.impl; }
+bool DataAddr::operator<(const DataAddr& o) const { return *impl < *o.impl; }
 
 DataAddr::DataAddr(const qpid::types::Variant::Map& m) { PI::ctor(*this, new DataAddrImpl(m)); }
 DataAddr::DataAddr(const string& n, const string& a, uint32_t e) { PI::ctor(*this, new DataAddrImpl(n, a, e)); }
@@ -45,7 +47,7 @@ const string& DataAddr::getAgentName() c
 uint32_t DataAddr::getAgentEpoch() const { return impl->getAgentEpoch(); }
 Variant::Map DataAddr::asMap() const { return impl->asMap(); }
 
-bool DataAddrImpl::operator==(const DataAddrImpl& other)
+bool DataAddrImpl::operator==(const DataAddrImpl& other) const
 {
     return
         agentName == other.agentName &&
@@ -54,7 +56,7 @@ bool DataAddrImpl::operator==(const Data
 }
 
 
-bool DataAddrImpl::operator<(const DataAddrImpl& other)
+bool DataAddrImpl::operator<(const DataAddrImpl& other) const
 {
     if (agentName < other.agentName) return true;
     if (agentName > other.agentName) return false;

Modified: qpid/branches/QPID-2519/cpp/src/qmf/DataAddrImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qmf/DataAddrImpl.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qmf/DataAddrImpl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qmf/DataAddrImpl.h Thu Oct 20 18:42:46 2011
@@ -38,8 +38,8 @@ namespace qmf {
         //
         // Methods from API handle
         //
-        bool operator==(const DataAddrImpl&);
-        bool operator<(const DataAddrImpl&);
+        bool operator==(const DataAddrImpl&) const;
+        bool operator<(const DataAddrImpl&) const;
         DataAddrImpl(const qpid::types::Variant::Map&);
         DataAddrImpl(const std::string& _name, const std::string& _agentName, uint32_t _agentEpoch=0) :
             agentName(_agentName), name(_name), agentEpoch(_agentEpoch) {}

Modified: qpid/branches/QPID-2519/cpp/src/qmf/PrivateImplRef.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qmf/PrivateImplRef.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qmf/PrivateImplRef.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qmf/PrivateImplRef.h Thu Oct 20 18:42:46 2011
@@ -23,8 +23,8 @@
  */
 
 #include "qmf/ImportExport.h"
-#include <boost/intrusive_ptr.hpp>
 #include "qpid/RefCounted.h"
+#include <boost/intrusive_ptr.hpp>
 
 namespace qmf {
 

Propchange: qpid/branches/QPID-2519/cpp/src/qmf/engine/Agent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 20 18:42:46 2011
@@ -0,0 +1,2 @@
+/qpid/branches/qpid-2935/qpid/cpp/src/qmf/engine/Agent.cpp:1061302-1072333
+/qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp:1072051-1185907

Modified: qpid/branches/QPID-2519/cpp/src/qmf/engine/ResilientConnection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qmf/engine/ResilientConnection.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qmf/engine/ResilientConnection.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qmf/engine/ResilientConnection.cpp Thu Oct 20 18:42:46 2011
@@ -334,8 +334,7 @@ void ResilientConnectionImpl::notify()
 {
     if (notifyFd != -1)
     {
-        int unused_ret;    //Suppress warnings about ignoring return value.
-        unused_ret = ::write(notifyFd, ".", 1);
+        (void) ::write(notifyFd, ".", 1);
     }
 }
 
@@ -432,8 +431,7 @@ void ResilientConnectionImpl::EnqueueEve
 
     if (notifyFd != -1)
     {
-        int unused_ret;    //Suppress warnings about ignoring return value.
-        unused_ret = ::write(notifyFd, ".", 1);
+        (void) ::write(notifyFd, ".", 1);
     }
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qmf/engine/SchemaImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qmf/engine/SchemaImpl.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qmf/engine/SchemaImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qmf/engine/SchemaImpl.cpp Thu Oct 20 18:42:46 2011
@@ -35,17 +35,17 @@ using qpid::framing::Uuid;
 SchemaHash::SchemaHash()
 {
     for (int idx = 0; idx < 16; idx++)
-        hash[idx] = 0x5A;
+        hash.b[idx] = 0x5A;
 }
 
 void SchemaHash::encode(Buffer& buffer) const
 {
-    buffer.putBin128(hash);
+    buffer.putBin128(hash.b);
 }
 
 void SchemaHash::decode(Buffer& buffer)
 {
-    buffer.getBin128(hash);
+    buffer.getBin128(hash.b);
 }
 
 void SchemaHash::update(uint8_t data)
@@ -55,9 +55,8 @@ void SchemaHash::update(uint8_t data)
 
 void SchemaHash::update(const char* data, uint32_t len)
 {
-    uint64_t* first  = (uint64_t*) hash;
-    uint64_t* second = (uint64_t*) hash + 1;
-
+    uint64_t* first  = &hash.q[0];
+    uint64_t* second = &hash.q[1];
     for (uint32_t idx = 0; idx < len; idx++) {
         *first = *first ^ (uint64_t) data[idx];
         *second = *second << 1;

Modified: qpid/branches/QPID-2519/cpp/src/qmf/engine/SchemaImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qmf/engine/SchemaImpl.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qmf/engine/SchemaImpl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qmf/engine/SchemaImpl.h Thu Oct 20 18:42:46 2011
@@ -35,7 +35,10 @@ namespace engine {
     //       they've been registered.
 
     class SchemaHash {
-        uint8_t hash[16];
+        union h {
+            uint8_t  b[16];
+            uint64_t q[2];
+        } hash;
     public:
         SchemaHash();
         void encode(qpid::framing::Buffer& buffer) const;
@@ -47,7 +50,7 @@ namespace engine {
         void update(Direction d) { update((uint8_t) d); }
         void update(Access a) { update((uint8_t) a); }
         void update(bool b) { update((uint8_t) (b ? 1 : 0)); }
-        const uint8_t* get() const { return hash; }
+        const uint8_t* get() const { return hash.b; }
         bool operator==(const SchemaHash& other) const;
         bool operator<(const SchemaHash& other) const;
         bool operator>(const SchemaHash& other) const;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/Address.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/Address.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/Address.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/Address.cpp Thu Oct 20 18:42:46 2011
@@ -28,7 +28,13 @@ namespace qpid {
 const string Address::TCP("tcp");
 
 ostream& operator<<(ostream& os, const Address& a) {
-    return os << a.protocol << ":" << a.host << ":" << a.port;
+    // If the host is an IPv6 literal we need to print "[]" around it
+    // (we detect IPv6 literals because they contain ":" which is otherwise illegal)
+    if (a.host.find(':') != string::npos) {
+        return os << a.protocol << ":[" << a.host << "]:" << a.port;
+    } else {
+        return os << a.protocol << ":" << a.host << ":" << a.port;
+    }
 }
 
 bool operator==(const Address& x, const Address& y) {

Modified: qpid/branches/QPID-2519/cpp/src/qpid/Exception.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/Exception.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/Exception.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/Exception.cpp Thu Oct 20 18:42:46 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,13 +21,25 @@
 
 #include "qpid/log/Statement.h"
 #include "qpid/Exception.h"
+#include "qpid/DisableExceptionLogging.h"
 #include <typeinfo>
 #include <assert.h>
 #include <string.h>
 
 namespace qpid {
 
+// Note on static initialization order: if an exception is constructed
+// in a static constructor before disableExceptionLogging has been
+// initialized, the worst that can happen is we lose an exception log
+// message. Since we shouldn't be throwing a lot of exceptions during
+// static construction this seems safe.
+static bool disableExceptionLogging = false;
+
+DisableExceptionLogging::DisableExceptionLogging() { disableExceptionLogging = true; }
+DisableExceptionLogging::~DisableExceptionLogging() { disableExceptionLogging = false; }
+
 Exception::Exception(const std::string& msg) throw() : message(msg) {
+    if (disableExceptionLogging) return;
     QPID_LOG_IF(debug, !msg.empty(), "Exception constructed: " << message);
 }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/Modules.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/Modules.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/Modules.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/Modules.cpp Thu Oct 20 18:42:46 2011
@@ -64,7 +64,6 @@ void tryShlib(const char* libname_, bool
     if (!isShlibName(libname)) libname += suffix();
     try {
         sys::Shlib shlib(libname);
-        QPID_LOG (info, "Loaded Module: " << libname);
     }
     catch (const std::exception& /*e*/) {
         if (!noThrow)
@@ -82,7 +81,7 @@ void loadModuleDir (std::string dirname,
             return;
         throw Exception ("Directory not found: " + dirname);
     }
-    if (!fs::is_directory(dirPath)) 
+    if (!fs::is_directory(dirPath))
     {
         throw Exception ("Invalid value for module-dir: " + dirname + " is not a directory");
     }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/Options.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/Options.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/Options.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/Options.cpp Thu Oct 20 18:42:46 2011
@@ -30,23 +30,6 @@ namespace qpid {
 using namespace std;
 
 
-/*
- *  ---------------------------------------------
- *  Explanation for Boost 103200 conditional code
- *  ---------------------------------------------
- *
- *  Please see large comment in Options.h .
- *
- */
-
-#if ( BOOST_VERSION == 103200 )
-std::vector<std::string> Options::long_names;
-std::vector<std::string> Options::short_names;
-#endif
-
-
-
-
 namespace {
 
 struct EnvOptMapper {
@@ -69,49 +52,11 @@ struct EnvOptMapper {
         static const std::string prefix("QPID_");
         if (envVar.substr(0, prefix.size()) == prefix) {
             string env = envVar.substr(prefix.size());
-#if (BOOST_VERSION >= 103300)
             typedef const std::vector< boost::shared_ptr<po::option_description> > OptDescs;
             OptDescs::const_iterator i =
                 find_if(opts.options().begin(), opts.options().end(), boost::bind(matchStr, env, _1));
             if (i != opts.options().end())
                 return (*i)->long_name();
-#else
-            /*
-             *  For Boost version 103200 and below.
-             *
-             *  In Boost version 103200, the options_description::options member,
-             *  used above, is private.  So what I will do here is use the 
-             *  count() funtion, which returns a 1 or 0 indicating presence or
-             *  absence of the environment variable.  
-             * 
-             * If it is present, I will return its name.  Env vars do not have
-             *  short and long forms, so the name is synonymous with the long 
-             *  name.  (This would not work for command line args.)
-             *  And if it's absent -- an empty string.
-             */
-
-
-            /*
-             * The env vars come in unaltered, i.e. QPID_FOO, but the 
-             * options are stored normalized as "qpid-foo".  Change the
-             * local variable "env" so it can be found by "opts".
-             */ 
-            for (std::string::iterator i = env.begin(); i != env.end(); ++i) 
-            {
-                *i = (*i == '_') 
-                     ? '-' 
-                     : ::tolower(*i);
-            }
-
-            if ( opts.count(env.c_str()) > 0 )
-            {
-              return env.c_str();
-            }
-            else
-            {
-              return string();
-            }
-#endif
         }
         return string();
     }
@@ -166,10 +111,6 @@ std::string prettyArg(const std::string&
 
 Options::Options(const string& name) : 
   po::options_description(name) 
-
-#if ( BOOST_VERSION == 103200 )
-  , m_less_easy(this, this)
-#endif
 {
 }
 
@@ -186,7 +127,6 @@ void Options::parse(int argc, char const
         parsing="command line options";
         if (argc > 0 && argv != 0) {
             if (allowUnknown) {
-#if (BOOST_VERSION >= 103300)
                 // This hideous workaround is required because boost 1.33 has a bug
                 // that causes 'allow_unregistered' to not work.
                 po::command_line_parser clp = po::command_line_parser(argc, const_cast<char**>(argv)).
@@ -200,113 +140,6 @@ void Options::parse(int argc, char const
                         filtopts.options.push_back (*i);
                 po::store(filtopts, vm);
 
-#elif ( BOOST_VERSION == 103200 )
-
-      /*
-       * "Tokenize" the argv to get rid of equals signs.
-       */
-      vector<string> tokenized_argv;
-      vector<string>::iterator iter;
-
-      for ( int i = 0; i < argc; ++ i )
-      {
-        string s = argv[i];
-        size_t equals_pos = s.find_first_of ( '=' );
-
-        if ( string::npos == equals_pos )  // There's no equals sign.  This is a token.
-        {
-          tokenized_argv.push_back(s);
-        }
-        else
-        {
-          // Two tokens -- before and after the equals position.
-          tokenized_argv.push_back ( s.substr(0, equals_pos) );
-          tokenized_argv.push_back ( s.substr(equals_pos+1) );
-        }
-      }
-
-
-      /*
-       * Now "filter" the tokenized argv, to get rid of all
-       * unrecognized options.  Because Boost 103200 has no
-       * facility for dealing gracefully with "unregistered" 
-       * options.
-       */
-      vector<string>            filtered_argv;
-      vector<string>::iterator  the_end = tokenized_argv.end();
-
-      // The program-name gets in for free...
-      iter = tokenized_argv.begin();
-      filtered_argv.push_back ( * iter );
-      ++ iter;
-
-      // ...but all other args get checked.
-      while ( iter < the_end )
-      {
-       /*
-        * If this is an argument that is registered,
-        * copy it to filtered_argv and also copy all
-        * of its arguments.
-        */
-       if ( is_registered_option ( * iter ) )
-       {
-         // Store this recognized arg.
-         filtered_argv.push_back ( * iter );
-         ++ iter;
-
-         // Copy all values for the above arg.
-         // Args are tokens that do not start with a minus.
-         while ( (iter < the_end) && ((* iter)[0] != '-') )
-         {
-           filtered_argv.push_back ( * iter );
-           ++ iter;
-         }
-       }
-       else
-       {
-         // Skip this unrecognized arg.
-         ++ iter;
-
-         // Copy all values for the above arg.
-         // Values are tokens that do not start with a minus.
-         while ( (iter < the_end) && ( '-' != (*iter)[0] ) )
-         {
-           ++ iter;
-         }
-       }
-     }
-
-     // Make an array of temporary C strings, because 
-     // the interface I can use wants it that way.
-     int     new_argc = filtered_argv.size();
-     char ** new_argv = new char * [ new_argc ];
-     int i = 0;
-
-     // cout << "NEW ARGV: |";
-     for ( iter = filtered_argv.begin();
-           iter < filtered_argv.end();
-           ++ iter, ++ i
-         )
-     {
-       new_argv[i] = strdup( (* iter).c_str() );
-       // cout << " " << new_argv[i] ;
-     }
-     // cout << "|\n";
-
-
-     // Use the array of C strings.
-     po::basic_parsed_options<char> bpo = po::parse_command_line(new_argc, const_cast<char**>(new_argv), *this);
-     po::store(bpo, vm);
-
-
-     // Now free the temporary C strings.
-     for ( i = 0; i < new_argc; ++ i )
-     {
-       free ( new_argv[i] );
-     }
-     delete[] new_argv;
-
-#endif
             }
             else
                 po::store(po::parse_command_line(argc, const_cast<char**>(argv), *this), vm);
@@ -363,107 +196,5 @@ CommonOptions::CommonOptions(const strin
 }
 
 
-
-
-#if ( BOOST_VERSION == 103200 )
-options_description_less_easy_init&
-options_description_less_easy_init::operator()(char const * name,
-           char const * description)
-{
-  // Snoop on the arguments....
-  owner->register_names ( name );
-  // ... then call parent function explicitly.
-  po::options_description_easy_init::operator() ( name, description );
-  return * this;
-}
-
-
-options_description_less_easy_init&
-options_description_less_easy_init::operator()(char const * name,
-           const po::value_semantic* s)
-{
-  // Snoop on the arguments....
-  owner->register_names ( name );
-  // ... then call parent function explicitly.
-  po::options_description_easy_init::operator() ( name, s );
-  return * this;
-}
-
-
-options_description_less_easy_init&
-options_description_less_easy_init::operator()(const char* name,
-           const po::value_semantic* s,
-           const char* description)
-{
-  // Snoop on the arguments....
-  owner->register_names ( name );
-  // ... then call parent function explicitly.
-  po::options_description_easy_init::operator() ( name, s, description );
-  return * this;
-}
-
-
-
-
-
-void
-Options::register_names ( std::string s )
-{
-  
-  std::string::size_type comma_pos = s.find_first_of ( ',' );
-
-  if ( std::string::npos == comma_pos )
-  {
-    // There is no short-name.
-    long_names.push_back ( s );
-  }
-  else
-  {
-    std::string long_name  = s.substr(0, comma_pos),
-                short_name = s.substr(comma_pos+1);
-    long_names .push_back ( long_name );
-    short_names.push_back ( short_name );
-  }
-  
-  /*
-   * There is no way to tell when the adding of new options is finished,
-   * so I re-sort after each one.
-   */
-  std::sort ( long_names .begin(), long_names .end() );
-  std::sort ( short_names.begin(), short_names.end() );
-}
-
-
-
-
-
-bool 
-Options::is_registered_option ( std::string s )
-{
-  std::string without_dashes = s.substr ( s.find_first_not_of ( '-' ) );
-  std::vector<std::string>::iterator i;
-
-  // Look among the long names.
-  i = std::find ( long_names.begin(),
-                  long_names.end(),
-                  without_dashes
-                );
-  if ( i != long_names.end() )
-    return true;
-
-  // Look among the short names.
-  i = std::find ( short_names.begin(),
-                  short_names.end(),
-                  without_dashes
-                );
-  if ( i != short_names.end() )
-    return true;
-
-
-  return false;
-}
-#endif
-
-
 } // namespace qpid
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/RefCounted.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/RefCounted.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/RefCounted.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/RefCounted.h Thu Oct 20 18:42:46 2011
@@ -53,8 +53,10 @@ protected:
 
 // intrusive_ptr support.
 namespace boost {
-inline void intrusive_ptr_add_ref(const qpid::RefCounted* p) { p->addRef(); }
-inline void intrusive_ptr_release(const qpid::RefCounted* p) { p->release(); }
+template <typename T>
+inline void intrusive_ptr_add_ref(const T* p) { p->qpid::RefCounted::addRef(); }
+template <typename T>
+inline void intrusive_ptr_release(const T* p) { p->qpid::RefCounted::release(); }
 }
 
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/RefCountedBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/RefCountedBuffer.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/RefCountedBuffer.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/RefCountedBuffer.cpp Thu Oct 20 18:42:46 2011
@@ -20,34 +20,27 @@
  */
 
 #include "qpid/RefCountedBuffer.h"
+#include <stdlib.h>
 #include <new>
 
 namespace qpid {
 
-RefCountedBuffer::RefCountedBuffer() : count(0) {}
-
-void RefCountedBuffer::destroy() const {
+void RefCountedBuffer::released() const {
     this->~RefCountedBuffer();
-    ::delete[] reinterpret_cast<const char*>(this);
-}
-
-char* RefCountedBuffer::addr() const {
-    return const_cast<char*>(reinterpret_cast<const char*>(this)+sizeof(RefCountedBuffer));
+    ::free (reinterpret_cast<void *>(const_cast<RefCountedBuffer *>(this)));
 }
 
-RefCountedBuffer::pointer RefCountedBuffer::create(size_t n) {
-    char* store=::new char[n+sizeof(RefCountedBuffer)];
+BufferRef RefCountedBuffer::create(size_t n) {
+    void* store=::malloc (n + sizeof(RefCountedBuffer));
+    if (NULL == store)
+        throw std::bad_alloc();
     new(store) RefCountedBuffer;
-    return pointer(reinterpret_cast<RefCountedBuffer*>(store));
+    char* start = reinterpret_cast<char *>(store) + sizeof(RefCountedBuffer);
+    return BufferRef(
+        boost::intrusive_ptr<RefCounted>(reinterpret_cast<RefCountedBuffer*>(store)),
+        start, start+n);
 }
 
-RefCountedBuffer::pointer::pointer() {}
-RefCountedBuffer::pointer::pointer(RefCountedBuffer* x) : p(x) {}
-RefCountedBuffer::pointer::pointer(const pointer& x) : p(x.p) {}
-RefCountedBuffer::pointer::~pointer() {}
-RefCountedBuffer::pointer& RefCountedBuffer::pointer::operator=(const RefCountedBuffer::pointer& x) { p = x.p; return *this; }
-
-char* RefCountedBuffer::pointer::cp() const { return p ? p->get() : 0; }
 } // namespace qpid
 
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/RefCountedBuffer.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/RefCountedBuffer.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/RefCountedBuffer.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/RefCountedBuffer.h Thu Oct 20 18:42:46 2011
@@ -22,68 +22,23 @@
  *
  */
 
-#include <boost/utility.hpp>
-#include <boost/detail/atomic_count.hpp>
-#include <boost/intrusive_ptr.hpp>
+#include <qpid/RefCounted.h>
+#include <qpid/BufferRef.h>
 
 namespace qpid {
 
 /**
- * Reference-counted byte buffer.
- * No alignment guarantees.
+ * Reference-counted byte buffer. No alignment guarantees.
  */
-class RefCountedBuffer : boost::noncopyable {
-    mutable boost::detail::atomic_count count;
-    RefCountedBuffer();
-    void destroy() const;
-    char* addr() const;
-
-public:
-    /** Smart char pointer to a reference counted buffer */
-    class pointer {
-        boost::intrusive_ptr<RefCountedBuffer> p;
-        char* cp() const;
-        pointer(RefCountedBuffer* x);
-      friend class RefCountedBuffer;
-
-      public:
-        pointer();
-        pointer(const pointer&);
-        ~pointer();
-        pointer& operator=(const pointer&);
-
-        char* get() { return cp(); }
-        operator char*() { return cp(); }
-        char& operator*() { return *cp(); }
-        char& operator[](size_t i) { return cp()[i]; }
-
-        const char* get() const { return cp(); }
-        operator const char*() const { return cp(); }
-        const char& operator*() const { return *cp(); }
-        const char& operator[](size_t i) const { return cp()[i]; }
-    };
-
+class RefCountedBuffer : public RefCounted {
+  public:
     /** Create a reference counted buffer of size n */
-    static pointer create(size_t n);
+    static BufferRef create(size_t n);
 
-    /** Get a pointer to the start of the buffer. */
-    char* get() { return addr(); }
-    const char* get() const { return addr(); }
-    char& operator[](size_t i) { return get()[i]; }
-    const char& operator[](size_t i) const { return get()[i]; }
-
-    void addRef() const { ++count; }
-    void release() const { if (--count==0) destroy(); }
-    long refCount() { return count; }
+  protected:
+    void released() const;
 };
 
 } // namespace qpid
 
-// intrusive_ptr support.
-namespace boost {
-inline void intrusive_ptr_add_ref(const qpid::RefCountedBuffer* p) { p->addRef(); }
-inline void intrusive_ptr_release(const qpid::RefCountedBuffer* p) { p->release(); }
-}
-
-
 #endif  /*!QPID_REFCOUNTEDBUFFER_H*/

Modified: qpid/branches/QPID-2519/cpp/src/qpid/Sasl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/Sasl.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/Sasl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/Sasl.h Thu Oct 20 18:42:46 2011
@@ -47,8 +47,8 @@ class Sasl
      *             client supports.
      * @param externalSecuritySettings security related details from the underlying transport
      */
-    virtual std::string start(const std::string& mechanisms,
-                              const qpid::sys::SecuritySettings* externalSecuritySettings = 0) = 0;
+    virtual bool start(const std::string& mechanisms, std::string& response,
+                       const qpid::sys::SecuritySettings* externalSecuritySettings = 0) = 0;
     virtual std::string step(const std::string& challenge) = 0;
     virtual std::string getMechanism() = 0;
     virtual std::string getUserId() = 0;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/SaslFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/SaslFactory.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/SaslFactory.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/SaslFactory.cpp Thu Oct 20 18:42:46 2011
@@ -112,7 +112,7 @@ class CyrusSasl : public Sasl
   public:
     CyrusSasl(const std::string & username, const std::string & password, const std::string & serviceName, const std::string & hostName, int minSsf, int maxSsf, bool allowInteraction);
     ~CyrusSasl();
-    std::string start(const std::string& mechanisms, const SecuritySettings* externalSettings);
+    bool start(const std::string& mechanisms, std::string& response, const SecuritySettings* externalSettings);
     std::string step(const std::string& challenge);
     std::string getMechanism();
     std::string getUserId();
@@ -182,17 +182,18 @@ CyrusSasl::CyrusSasl(const std::string &
         callbacks[i].id = SASL_CB_AUTHNAME;
         callbacks[i].proc = (CallbackProc*) &getUserFromSettings;
         callbacks[i++].context = &settings;
-    }
 
-    callbacks[i].id = SASL_CB_PASS;
-    if (settings.password.empty()) {
-        callbacks[i].proc = 0;
-        callbacks[i++].context = 0;        
-    } else {
-        callbacks[i].proc = (CallbackProc*) &getPasswordFromSettings;
-        callbacks[i++].context = &settings;
+        callbacks[i].id = SASL_CB_PASS;
+        if (settings.password.empty()) {
+            callbacks[i].proc = 0;
+            callbacks[i++].context = 0;        
+        } else {
+            callbacks[i].proc = (CallbackProc*) &getPasswordFromSettings;
+            callbacks[i++].context = &settings;
+        }
     }
 
+
     callbacks[i].id = SASL_CB_LIST_END;
     callbacks[i].proc = 0;
     callbacks[i++].context = 0;
@@ -209,7 +210,7 @@ namespace {
     const std::string SSL("ssl");
 }
 
-std::string CyrusSasl::start(const std::string& mechanisms, const SecuritySettings* externalSettings)
+bool CyrusSasl::start(const std::string& mechanisms, std::string& response, const SecuritySettings* externalSettings)
 {
     QPID_LOG(debug, "CyrusSasl::start(" << mechanisms << ")");
     int result = sasl_client_new(settings.service.c_str(),
@@ -282,7 +283,12 @@ std::string CyrusSasl::start(const std::
     mechanism = std::string(chosenMechanism);
     QPID_LOG(debug, "CyrusSasl::start(" << mechanisms << "): selected "
              << mechanism << " response: '" << std::string(out, outlen) << "'");
-    return std::string(out, outlen);
+    if (out) {
+        response = std::string(out, outlen);
+        return true;
+    } else {
+        return false;
+    }
 }
 
 std::string CyrusSasl::step(const std::string& challenge)

Modified: qpid/branches/QPID-2519/cpp/src/qpid/Url.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/Url.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/Url.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/Url.cpp Thu Oct 20 18:42:46 2011
@@ -156,11 +156,12 @@ class UrlParser {
         return false;
     }
 
-    // TODO aconway 2008-11-20: this does not fully implement
-    // http://www.ietf.org/rfc/rfc3986.txt. Works for DNS names and
-    // ipv4 literals but won't handle ipv6.
+    // A liberal interpretation of http://www.ietf.org/rfc/rfc3986.txt.
+    // Works for DNS names and and ipv4 and ipv6 literals
     // 
     bool host(string& h) {
+        if (ip6literal(h)) return true;
+
         const char* start=i;
         while (unreserved() || pctEncoded())
             ;
@@ -169,6 +170,22 @@ class UrlParser {
         return true;
     }
 
+    // This is a bit too liberal for IPv6 literal addresses, but probably good enough
+    bool ip6literal(string& h) {
+        if (literal("[")) {
+            const char* start = i;
+            while (hexDigit() || literal(":") || literal("."))
+                ;
+            const char* end = i;
+            if ( end-start < 2 ) return false; // Smallest valid address is "::"
+            if (literal("]")) {
+                h.assign(start, end);
+                return true;
+            }
+        }
+        return false;
+    }
+
     bool unreserved() { return (::isalnum(*i) || ::strchr("-._~", *i)) && advance(); }
 
     bool pctEncoded() { return literal("%") && hexDigit() && hexDigit(); }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/acl/Acl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/acl/Acl.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/acl/Acl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/acl/Acl.cpp Thu Oct 20 18:42:46 2011
@@ -180,7 +180,10 @@ Acl::Acl (AclValues& av, Broker& b): acl
       {
       case _qmf::Acl::METHOD_RELOADACLFILE :
           readAclFile(text);
-          status = Manageable::STATUS_USER;
+          if (text.empty())
+              status = Manageable::STATUS_OK;
+          else
+              status = Manageable::STATUS_USER;
           break;
       }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/acl/AclPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/acl/AclPlugin.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/acl/AclPlugin.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/acl/AclPlugin.cpp Thu Oct 20 18:42:46 2011
@@ -69,7 +69,7 @@ struct AclPlugin : public Plugin {
     	}
 
         acl = new Acl(values, b);
-		b.setAcl(acl.get());
+        b.setAcl(acl.get());
         b.addFinalizer(boost::bind(&AclPlugin::shutdown, this));
     }
 

Modified: qpid/branches/QPID-2519/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/agent/ManagementAgentImpl.cpp Thu Oct 20 18:42:46 2011
@@ -305,43 +305,47 @@ void ManagementAgentImpl::raiseEvent(con
         "emerg", "alert", "crit", "error", "warn",
         "note", "info", "debug"
     };
-    sys::Mutex::ScopedLock lock(agentLock);
-    Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
-    uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
+    string content;
     stringstream key;
+    Variant::Map headers;
 
-    // key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
-    // event.getPackageName() << "." << event.getEventName();
-    key << "agent.ind.event." << keyifyNameStr(event.getPackageName())
-        << "." << keyifyNameStr(event.getEventName())
-        << "." << severityStr[sev]
-        << "." << vendorNameKey
-        << "." << productNameKey
-        << "." << instanceNameKey;
+    {
+        sys::Mutex::ScopedLock lock(agentLock);
+        Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
+        uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
 
-    Variant::Map map_;
-    Variant::Map schemaId;
-    Variant::Map values;
-    Variant::Map headers;
-    string content;
+        // key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
+        // event.getPackageName() << "." << event.getEventName();
+        key << "agent.ind.event." << keyifyNameStr(event.getPackageName())
+            << "." << keyifyNameStr(event.getEventName())
+            << "." << severityStr[sev]
+            << "." << vendorNameKey
+            << "." << productNameKey
+            << "." << instanceNameKey;
+
+        Variant::Map map_;
+        Variant::Map schemaId;
+        Variant::Map values;
+
+        map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
+                                               event.getEventName(),
+                                               event.getMd5Sum(),
+                                               ManagementItem::CLASS_KIND_EVENT);
+        event.mapEncode(values);
+        map_["_values"] = values;
+        map_["_timestamp"] = uint64_t(Duration(EPOCH, now()));
+        map_["_severity"] = sev;
 
-    map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
-                                           event.getEventName(),
-                                           event.getMd5Sum(),
-                                           ManagementItem::CLASS_KIND_EVENT);
-    event.mapEncode(values);
-    map_["_values"] = values;
-    map_["_timestamp"] = uint64_t(Duration(EPOCH, now()));
-    map_["_severity"] = sev;
+        headers["method"] = "indication";
+        headers["qmf.opcode"] = "_data_indication";
+        headers["qmf.content"] = "_event";
+        headers["qmf.agent"] = name_address;
 
-    headers["method"] = "indication";
-    headers["qmf.opcode"] = "_data_indication";
-    headers["qmf.content"] = "_event";
-    headers["qmf.agent"] = name_address;
+        Variant::List list;
+        list.push_back(map_);
+        ListCodec::encode(list, content);
+    }
 
-    Variant::List list;
-    list.push_back(map_);
-    ListCodec::encode(list, content);
     connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str(), "amqp/list");
 }
 
@@ -521,9 +525,12 @@ void ManagementAgentImpl::sendException(
 
 void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& rte, const string& rtk)
 {
-    sys::Mutex::ScopedLock lock(agentLock);
     string packageName;
     SchemaClassKey key;
+    uint32_t outLen(0);
+    char localBuffer[MA_BUFFER_SIZE];
+    Buffer outBuffer(localBuffer, MA_BUFFER_SIZE);
+    bool found(false);
 
     inBuffer.getShortString(packageName);
     inBuffer.getShortString(key.name);
@@ -531,26 +538,30 @@ void ManagementAgentImpl::handleSchemaRe
 
     QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name);
 
-    PackageMap::iterator pIter = packages.find(packageName);
-    if (pIter != packages.end()) {
-        ClassMap& cMap = pIter->second;
-        ClassMap::iterator cIter = cMap.find(key);
-        if (cIter != cMap.end()) {
-            SchemaClass& schema = cIter->second;
-            Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
-            uint32_t outLen;
-            string   body;
-
-            encodeHeader(outBuffer, 's', sequence);
-            schema.writeSchemaCall(body);
-            outBuffer.putRawData(body);
-            outLen = MA_BUFFER_SIZE - outBuffer.available();
-            outBuffer.reset();
-            connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk);
-
-            QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
+    {
+        sys::Mutex::ScopedLock lock(agentLock);
+        PackageMap::iterator pIter = packages.find(packageName);
+        if (pIter != packages.end()) {
+            ClassMap& cMap = pIter->second;
+            ClassMap::iterator cIter = cMap.find(key);
+            if (cIter != cMap.end()) {
+                SchemaClass& schema = cIter->second;
+                string   body;
+
+                encodeHeader(outBuffer, 's', sequence);
+                schema.writeSchemaCall(body);
+                outBuffer.putRawData(body);
+                outLen = MA_BUFFER_SIZE - outBuffer.available();
+                outBuffer.reset();
+                found = true;
+            }
         }
     }
+
+    if (found) {
+        connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk);
+        QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
+    }
 }
 
 void ManagementAgentImpl::handleConsoleAddedIndication()
@@ -969,18 +980,6 @@ ManagementAgentImpl::PackageMap::iterato
     pair<PackageMap::iterator, bool> result =
         packages.insert(pair<string, ClassMap>(name, ClassMap()));
 
-    if (connected) {
-        // Publish a package-indication message
-        Buffer   outBuffer(outputBuffer, MA_BUFFER_SIZE);
-        uint32_t outLen;
-
-        encodeHeader(outBuffer, 'p');
-        encodePackageIndication(outBuffer, result.first);
-        outLen = MA_BUFFER_SIZE - outBuffer.available();
-        outBuffer.reset();
-        connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "schema.package");
-    }
-
     return result.first;
 }
 
@@ -1038,131 +1037,146 @@ void ManagementAgentImpl::encodeClassInd
     QPID_LOG(trace, "SENT ClassInd: package=" << (*pIter).first << " class=" << key.name);
 }
 
+struct MessageItem {
+    string content;
+    Variant::Map headers;
+    string key;
+    MessageItem(const Variant::Map& h, const string& k) : headers(h), key(k) {}
+};
+
 void ManagementAgentImpl::periodicProcessing()
 {
     string addr_key_base = "agent.ind.data.";
-    sys::Mutex::ScopedLock lock(agentLock);
     list<ObjectId> deleteList;
-
-    if (!connected)
-        return;
+    list<boost::shared_ptr<MessageItem> > message_list;
 
     sendHeartbeat();
 
-    moveNewObjectsLH();
-
-    //
-    //  Clear the been-here flag on all objects in the map.
-    //
-    for (ObjectMap::iterator iter = managementObjects.begin();
-         iter != managementObjects.end();
-         iter++) {
-        ManagementObject* object = iter->second.get();
-        object->setFlags(0);
-        if (publishAllData) {
-            object->setForcePublish(true);
-        }
-    }
+    {
+        sys::Mutex::ScopedLock lock(agentLock);
 
-    publishAllData = false;
+        if (!connected)
+            return;
 
-    //
-    //  Process the entire object map.
-    //
-    uint32_t v2Objs = 0;
-
-    for (ObjectMap::iterator baseIter = managementObjects.begin();
-         baseIter != managementObjects.end();
-         baseIter++) {
-        ManagementObject* baseObject = baseIter->second.get();
+        moveNewObjectsLH();
 
         //
-        //  Skip until we find a base object requiring a sent message.
+        //  Clear the been-here flag on all objects in the map.
         //
-        if (baseObject->getFlags() == 1 ||
-            (!baseObject->getConfigChanged() &&
-             !baseObject->getInstChanged() &&
-             !baseObject->getForcePublish() &&
-             !baseObject->isDeleted()))
-            continue;
-
-        std::string packageName = baseObject->getPackageName();
-        std::string className = baseObject->getClassName();
-
-        Variant::List list_;
-        string content;
-        std::stringstream addr_key;
-        Variant::Map  headers;
-
-        addr_key << addr_key_base;
-        addr_key << keyifyNameStr(packageName)
-                 << "." << keyifyNameStr(className)
-                 << "." << vendorNameKey
-                 << "." << productNameKey
-                 << "." << instanceNameKey;
-
-        headers["method"] = "indication";
-        headers["qmf.opcode"] = "_data_indication";
-        headers["qmf.content"] = "_data";
-        headers["qmf.agent"] = name_address;
-
-        for (ObjectMap::iterator iter = baseIter;
+        for (ObjectMap::iterator iter = managementObjects.begin();
              iter != managementObjects.end();
              iter++) {
             ManagementObject* object = iter->second.get();
-            bool send_stats, send_props;
-            if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
-                object->setFlags(1);
-                if (object->getConfigChanged() || object->getInstChanged())
-                    object->setUpdateTime();
+            object->setFlags(0);
+            if (publishAllData) {
+                object->setForcePublish(true);
+            }
+        }
+
+        publishAllData = false;
+
+        //
+        //  Process the entire object map.
+        //
+        uint32_t v2Objs = 0;
 
-                send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
-                send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
+        for (ObjectMap::iterator baseIter = managementObjects.begin();
+             baseIter != managementObjects.end();
+             baseIter++) {
+            ManagementObject* baseObject = baseIter->second.get();
+
+            //
+            //  Skip until we find a base object requiring a sent message.
+            //
+            if (baseObject->getFlags() == 1 ||
+                (!baseObject->getConfigChanged() &&
+                 !baseObject->getInstChanged() &&
+                 !baseObject->getForcePublish() &&
+                 !baseObject->isDeleted()))
+                continue;
+
+            std::string packageName = baseObject->getPackageName();
+            std::string className = baseObject->getClassName();
+
+            Variant::List list_;
+            std::stringstream addr_key;
+            Variant::Map  headers;
+
+            addr_key << addr_key_base;
+            addr_key << keyifyNameStr(packageName)
+                     << "." << keyifyNameStr(className)
+                     << "." << vendorNameKey
+                     << "." << productNameKey
+                     << "." << instanceNameKey;
+
+            headers["method"] = "indication";
+            headers["qmf.opcode"] = "_data_indication";
+            headers["qmf.content"] = "_data";
+            headers["qmf.agent"] = name_address;
+
+            for (ObjectMap::iterator iter = baseIter;
+                 iter != managementObjects.end();
+                 iter++) {
+                ManagementObject* object = iter->second.get();
+                bool send_stats, send_props;
+                if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
+                    object->setFlags(1);
+                    if (object->getConfigChanged() || object->getInstChanged())
+                        object->setUpdateTime();
+
+                    send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+                    send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
+
+                    if (send_stats || send_props) {
+                        Variant::Map map_;
+                        Variant::Map values;
+                        Variant::Map oid;
 
-                if (send_stats || send_props) {
-                    Variant::Map map_;
-                    Variant::Map values;
-                    Variant::Map oid;
-
-                    object->getObjectId().mapEncode(oid);
-                    map_["_object_id"] = oid;
-                    map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
-                                                           object->getClassName(),
-                                                           object->getMd5Sum());
-                    object->writeTimestamps(map_);
-                    object->mapEncodeValues(values, send_props, send_stats);
-                    map_["_values"] = values;
-                    list_.push_back(map_);
-
-                    if (++v2Objs >= maxV2ReplyObjs) {
-                        v2Objs = 0;
-                        ListCodec::encode(list_, content);
-
-                        connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list");
-                        list_.clear();
-                        content.clear();
-                        QPID_LOG(trace, "SENT DataIndication");
+                        object->getObjectId().mapEncode(oid);
+                        map_["_object_id"] = oid;
+                        map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+                                                               object->getClassName(),
+                                                               object->getMd5Sum());
+                        object->writeTimestamps(map_);
+                        object->mapEncodeValues(values, send_props, send_stats);
+                        map_["_values"] = values;
+                        list_.push_back(map_);
+
+                        if (++v2Objs >= maxV2ReplyObjs) {
+                            v2Objs = 0;
+                            boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str()));
+                            ListCodec::encode(list_, item->content);
+                            message_list.push_back(item);
+                            list_.clear();
+                        }
                     }
+
+                    if (object->isDeleted())
+                        deleteList.push_back(iter->first);
+                    object->setForcePublish(false);
                 }
+            }
 
-                if (object->isDeleted())
-                    deleteList.push_back(iter->first);
-                object->setForcePublish(false);
+            if (!list_.empty()) {
+                boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str()));
+                ListCodec::encode(list_, item->content);
+                message_list.push_back(item);
             }
         }
 
-        if (!list_.empty()) {
-            ListCodec::encode(list_, content);
-            connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list");
-            QPID_LOG(trace, "SENT DataIndication");
-        }
+        // Delete flagged objects
+        for (list<ObjectId>::reverse_iterator iter = deleteList.rbegin();
+             iter != deleteList.rend();
+             iter++)
+            managementObjects.erase(*iter);
     }
 
-    // Delete flagged objects
-    for (list<ObjectId>::reverse_iterator iter = deleteList.rbegin();
-         iter != deleteList.rend();
-         iter++)
-        managementObjects.erase(*iter);
+    while (!message_list.empty()) {
+        boost::shared_ptr<MessageItem> item(message_list.front());
+        message_list.pop_front();
+        connThreadBody.sendBuffer(item->content, "", item->headers, topicExchange, item->key, "amqp/list");
+        QPID_LOG(trace, "SENT DataIndication");
+    }
 }
 
 
@@ -1364,13 +1378,26 @@ bool ManagementAgentImpl::ConnectionThre
 
 void ManagementAgentImpl::PublishThread::run()
 {
-    uint16_t    totalSleep;
+    uint16_t totalSleep;
+    uint16_t sleepTime;
 
     while (!shutdown) {
         agent.periodicProcessing();
         totalSleep = 0;
-        while (totalSleep++ < agent.getInterval() && !shutdown) {
-            ::sleep(1);
+
+        //
+        // Calculate a sleep time that is no greater than 5 seconds and
+        // no less than 1 second.
+        //
+        sleepTime = agent.getInterval();
+        if (sleepTime > 5)
+            sleepTime = 5;
+        else if (sleepTime == 0)
+            sleepTime = 1;
+
+        while (totalSleep < agent.getInterval() && !shutdown) {
+            ::sleep(sleepTime);
+            totalSleep += sleepTime;
         }
     }
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/agent/ManagementAgentImpl.h Thu Oct 20 18:42:46 2011
@@ -62,8 +62,8 @@ class ManagementAgentImpl : public Manag
               uint16_t intervalSeconds = 10,
               bool useExternalThread = false,
               const std::string& storeFile = "",
-              const std::string& uid = "guest",
-              const std::string& pwd = "guest",
+              const std::string& uid = "",
+              const std::string& pwd = "",
               const std::string& mech = "PLAIN",
               const std::string& proto = "tcp");
     void init(const management::ConnectionSettings& settings,

Modified: qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/Codecs.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/Codecs.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/Codecs.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/Codecs.cpp Thu Oct 20 18:42:46 2011
@@ -127,10 +127,10 @@ Variant toVariant(boost::shared_ptr<Fiel
     switch (in->getType()) {
         //Fixed Width types:
       case 0x01: out.setEncoding(amqp0_10_binary);
-      case 0x02: out = in->getIntegerValue<int8_t, 1>(); break;
-      case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break;
+      case 0x02: out = in->getIntegerValue<int8_t>(); break;
+      case 0x03: out = in->getIntegerValue<uint8_t>(); break;
       case 0x04: break; //TODO: iso-8859-15 char
-      case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break;
+      case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t>()); break;
       case 0x10: out.setEncoding(amqp0_10_binary);
       case 0x11: out = in->getIntegerValue<int16_t, 2>(); break;
       case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Thu Oct 20 18:42:46 2011
@@ -130,9 +130,6 @@ void SessionHandler::handleException(con
 }
 
 namespace {
-bool isControl(const AMQFrame& f) {
-    return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_CONTROL;
-}
 bool isCommand(const AMQFrame& f) {
     return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_COMMAND;
 }
@@ -191,9 +188,10 @@ void SessionHandler::detach(const std::s
 void SessionHandler::detached(const std::string& name, uint8_t code) {
     CHECK_NAME(name, "session.detached");
     awaitingDetached = false;
-    if (code != session::DETACH_CODE_NORMAL)
+    if (code != session::DETACH_CODE_NORMAL) {
+        sendReady = receiveReady = false;
         channelException(convert(code), "session.detached from peer.");
-    else {
+    } else {
         handleDetach();
     }
 }

Modified: qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/SessionHandler.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/SessionHandler.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/SessionHandler.h Thu Oct 20 18:42:46 2011
@@ -41,8 +41,8 @@ namespace amqp_0_10 {
  * to a session state.
  */
 
-class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
-                       public framing::FrameHandler::InOutHandler
+class QPID_COMMON_CLASS_EXTERN SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
+        public framing::FrameHandler::InOutHandler
 {
   public:
     QPID_COMMON_EXTERN SessionHandler(framing::FrameHandler* out=0, uint16_t channel=0);
@@ -66,7 +66,7 @@ class SessionHandler : public framing::A
     QPID_COMMON_EXTERN void handleException(const qpid::SessionException& e);
 
     /** True if the handler is ready to send and receive */
-    bool ready() const;
+    QPID_COMMON_EXTERN bool ready() const;
 
     // Protocol methods
     QPID_COMMON_EXTERN void attach(const std::string& name, bool force);

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Bridge.cpp?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Bridge.cpp Thu Oct 20 18:42:46 2011
@@ -164,6 +164,12 @@ void Bridge::destroy()
     listener(this);
 }
 
+bool Bridge::isSessionReady() const
+{
+    SessionHandler& sessionHandler = conn->getChannel(id);
+    return sessionHandler.ready();
+}
+
 void Bridge::setPersistenceId(uint64_t pId) const
 {
     persistenceId = pId;

Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Bridge.h?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Bridge.h Thu Oct 20 18:42:46 2011
@@ -59,6 +59,8 @@ public:
     void destroy();
     bool isDurable() { return args.i_durable; }
 
+    bool isSessionReady() const;
+
     management::ManagementObject* GetManagementObject() const;
     management::Manageable::status_t ManagementMethod(uint32_t methodId,
                                                       management::Args& args,



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org