You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC
svn commit: r1187150 [6/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf2...
Modified: qpid/branches/QPID-2519/cpp/src/qmf/ConsoleSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qmf/ConsoleSession.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qmf/ConsoleSession.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qmf/ConsoleSession.cpp Fri Oct 21 01:19:00 2011
@@ -54,7 +54,6 @@ void ConsoleSession::setAgentFilter(cons
void ConsoleSession::open() { impl->open(); }
void ConsoleSession::close() { impl->close(); }
bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextEvent(e, t); }
-int ConsoleSession::pendingEvents() const { return impl->pendingEvents(); }
uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); }
Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); }
Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); }
@@ -66,9 +65,9 @@ Subscription ConsoleSession::subscribe(c
//========================================================================================
ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
- connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
- opened(false), eventNotifier(0), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
- connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1)
+ connection(c), domain("default"), maxAgentAgeMinutes(5),
+ opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
+ connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
{
if (!options.empty()) {
qpid::messaging::AddressParser parser(options);
@@ -92,14 +91,7 @@ ConsoleSessionImpl::ConsoleSessionImpl(C
iter = optMap.find("strict-security");
if (iter != optMap.end())
strictSecurity = iter->second.asBool();
-
- iter = optMap.find("max-thread-wait-time");
- if (iter != optMap.end())
- maxThreadWaitTime = iter->second.asUint32();
}
-
- if (maxThreadWaitTime > 60)
- maxThreadWaitTime = 60;
}
@@ -107,11 +99,6 @@ ConsoleSessionImpl::~ConsoleSessionImpl(
{
if (opened)
close();
-
- if (thread) {
- thread->join();
- delete thread;
- }
}
@@ -166,12 +153,6 @@ void ConsoleSessionImpl::open()
if (opened)
throw QmfException("The session is already open");
- // If the thread exists, join and delete it before creating a new one.
- if (thread) {
- thread->join();
- delete thread;
- }
-
// Establish messaging addresses
directBase = "qmf." + domain + ".direct";
topicBase = "qmf." + domain + ".topic";
@@ -200,36 +181,30 @@ void ConsoleSessionImpl::open()
// Start the receiver thread
threadCanceled = false;
- opened = true;
thread = new qpid::sys::Thread(*this);
// Send an agent_locate to direct address 'broker' to identify the connected-broker-agent.
sendBrokerLocate();
if (agentQuery)
sendAgentLocate();
+
+ opened = true;
}
-void ConsoleSessionImpl::closeAsync()
+void ConsoleSessionImpl::close()
{
if (!opened)
throw QmfException("The session is already closed");
- // Stop the receiver thread. Don't join it until the destructor is called or open() is called.
+ // Stop and join the receiver thread
threadCanceled = true;
- opened = false;
-}
-
+ thread->join();
+ delete thread;
-void ConsoleSessionImpl::close()
-{
- closeAsync();
-
- if (thread) {
- thread->join();
- delete thread;
- thread = 0;
- }
+ // Close the AMQP session
+ session.close();
+ opened = false;
}
@@ -238,19 +213,13 @@ bool ConsoleSessionImpl::nextEvent(Conso
uint64_t milliseconds = timeout.getMilliseconds();
qpid::sys::Mutex::ScopedLock l(lock);
- if (eventQueue.empty() && milliseconds > 0) {
- int64_t nsecs(qpid::sys::TIME_INFINITE);
- if ((uint64_t)(nsecs / 1000000) > milliseconds)
- nsecs = (int64_t) milliseconds * 1000000;
- qpid::sys::Duration then(nsecs);
- cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
- }
+ if (eventQueue.empty())
+ cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
+ qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
if (!eventQueue.empty()) {
event = eventQueue.front();
eventQueue.pop();
- if (eventQueue.empty())
- alertEventNotifierLH(false);
return true;
}
@@ -258,27 +227,6 @@ bool ConsoleSessionImpl::nextEvent(Conso
}
-int ConsoleSessionImpl::pendingEvents() const
-{
- qpid::sys::Mutex::ScopedLock l(lock);
- return eventQueue.size();
-}
-
-
-void ConsoleSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
-{
- qpid::sys::Mutex::ScopedLock l(lock);
- eventNotifier = notifier;
-}
-
-
-EventNotifierImpl* ConsoleSessionImpl::getEventNotifier() const
-{
- qpid::sys::Mutex::ScopedLock l(lock);
- return eventNotifier;
-}
-
-
uint32_t ConsoleSessionImpl::getAgentCount() const
{
qpid::sys::Mutex::ScopedLock l(lock);
@@ -320,10 +268,8 @@ void ConsoleSessionImpl::enqueueEventLH(
{
bool notify = eventQueue.empty();
eventQueue.push(event);
- if (notify) {
+ if (notify)
cond.notify();
- alertEventNotifierLH(true);
- }
}
@@ -475,23 +421,7 @@ void ConsoleSessionImpl::handleAgentUpda
iter = content.find("_values");
if (iter == content.end())
return;
- const Variant::Map& in_attrs(iter->second.asMap());
- Variant::Map attrs;
-
- //
- // Copy the map from the message to "attrs". Translate any old-style
- // keys to their new key values in the process.
- //
- for (iter = in_attrs.begin(); iter != in_attrs.end(); iter++) {
- if (iter->first == "epoch")
- attrs[protocol::AGENT_ATTR_EPOCH] = iter->second;
- else if (iter->first == "timestamp")
- attrs[protocol::AGENT_ATTR_TIMESTAMP] = iter->second;
- else if (iter->first == "heartbeat_interval")
- attrs[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = iter->second;
- else
- attrs[iter->first] = iter->second;
- }
+ Variant::Map attrs(iter->second.asMap());
iter = attrs.find(protocol::AGENT_ATTR_EPOCH);
if (iter != attrs.end())
@@ -632,13 +562,6 @@ void ConsoleSessionImpl::periodicProcess
}
-void ConsoleSessionImpl::alertEventNotifierLH(bool readable)
-{
- if (eventNotifier)
- eventNotifier->setReadable(readable);
-}
-
-
void ConsoleSessionImpl::run()
{
QPID_LOG(debug, "ConsoleSession thread started");
@@ -649,7 +572,7 @@ void ConsoleSessionImpl::run()
qpid::sys::TIME_SEC);
Receiver rx;
- bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
+ bool valid = session.nextReceiver(rx, Duration::SECOND);
if (threadCanceled)
break;
if (valid) {
@@ -666,18 +589,6 @@ void ConsoleSessionImpl::run()
enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED)));
}
- session.close();
QPID_LOG(debug, "ConsoleSession thread exiting");
}
-
-ConsoleSessionImpl& ConsoleSessionImplAccess::get(ConsoleSession& session)
-{
- return *session.impl;
-}
-
-
-const ConsoleSessionImpl& ConsoleSessionImplAccess::get(const ConsoleSession& session)
-{
- return *session.impl;
-}
Modified: qpid/branches/QPID-2519/cpp/src/qmf/ConsoleSessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qmf/ConsoleSessionImpl.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qmf/ConsoleSessionImpl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qmf/ConsoleSessionImpl.h Fri Oct 21 01:19:00 2011
@@ -27,7 +27,6 @@
#include "qmf/SchemaId.h"
#include "qmf/Schema.h"
#include "qmf/ConsoleEventImpl.h"
-#include "qmf/EventNotifierImpl.h"
#include "qmf/SchemaCache.h"
#include "qmf/Query.h"
#include "qpid/sys/Mutex.h"
@@ -42,13 +41,9 @@
#include "qpid/messaging/Address.h"
#include "qpid/management/Buffer.h"
#include "qpid/types/Variant.h"
-
-#include <boost/shared_ptr.hpp>
#include <map>
#include <queue>
-using namespace std;
-
namespace qmf {
class ConsoleSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
public:
@@ -61,14 +56,8 @@ namespace qmf {
void setDomain(const std::string& d) { domain = d; }
void setAgentFilter(const std::string& f);
void open();
- void closeAsync();
void close();
bool nextEvent(ConsoleEvent& e, qpid::messaging::Duration t);
- int pendingEvents() const;
-
- void setEventNotifier(EventNotifierImpl* notifier);
- EventNotifierImpl* getEventNotifier() const;
-
uint32_t getAgentCount() const;
Agent getAgent(uint32_t i) const;
Agent getConnectedBrokerAgent() const { return connectedBrokerAgent; }
@@ -86,11 +75,9 @@ namespace qmf {
uint32_t maxAgentAgeMinutes;
bool listenOnDirect;
bool strictSecurity;
- uint32_t maxThreadWaitTime;
Query agentQuery;
bool opened;
std::queue<ConsoleEvent> eventQueue;
- EventNotifierImpl* eventNotifier;
qpid::sys::Thread* thread;
bool threadCanceled;
uint64_t lastVisit;
@@ -102,8 +89,6 @@ namespace qmf {
std::string directBase;
std::string topicBase;
boost::shared_ptr<SchemaCache> schemaCache;
- qpid::sys::Mutex corrlock;
- uint32_t nextCorrelator;
void enqueueEvent(const ConsoleEvent&);
void enqueueEventLH(const ConsoleEvent&);
@@ -113,17 +98,10 @@ namespace qmf {
void handleAgentUpdate(const std::string&, const qpid::types::Variant::Map&, const qpid::messaging::Message&);
void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&);
void periodicProcessing(uint64_t);
- void alertEventNotifierLH(bool readable);
void run();
- uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; }
friend class AgentImpl;
};
-
- struct ConsoleSessionImplAccess {
- static ConsoleSessionImpl& get(ConsoleSession& session);
- static const ConsoleSessionImpl& get(const ConsoleSession& session);
- };
}
#endif
Modified: qpid/branches/QPID-2519/cpp/src/qmf/DataAddr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qmf/DataAddr.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qmf/DataAddr.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qmf/DataAddr.cpp Fri Oct 21 01:19:00 2011
@@ -36,9 +36,7 @@ DataAddr::~DataAddr() { PI::dtor(*this);
DataAddr& DataAddr::operator=(const DataAddr& s) { return PI::assign(*this, s); }
bool DataAddr::operator==(const DataAddr& o) { return *impl == *o.impl; }
-bool DataAddr::operator==(const DataAddr& o) const { return *impl == *o.impl; }
bool DataAddr::operator<(const DataAddr& o) { return *impl < *o.impl; }
-bool DataAddr::operator<(const DataAddr& o) const { return *impl < *o.impl; }
DataAddr::DataAddr(const qpid::types::Variant::Map& m) { PI::ctor(*this, new DataAddrImpl(m)); }
DataAddr::DataAddr(const string& n, const string& a, uint32_t e) { PI::ctor(*this, new DataAddrImpl(n, a, e)); }
@@ -47,7 +45,7 @@ const string& DataAddr::getAgentName() c
uint32_t DataAddr::getAgentEpoch() const { return impl->getAgentEpoch(); }
Variant::Map DataAddr::asMap() const { return impl->asMap(); }
-bool DataAddrImpl::operator==(const DataAddrImpl& other) const
+bool DataAddrImpl::operator==(const DataAddrImpl& other)
{
return
agentName == other.agentName &&
@@ -56,7 +54,7 @@ bool DataAddrImpl::operator==(const Data
}
-bool DataAddrImpl::operator<(const DataAddrImpl& other) const
+bool DataAddrImpl::operator<(const DataAddrImpl& other)
{
if (agentName < other.agentName) return true;
if (agentName > other.agentName) return false;
Modified: qpid/branches/QPID-2519/cpp/src/qmf/DataAddrImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qmf/DataAddrImpl.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qmf/DataAddrImpl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qmf/DataAddrImpl.h Fri Oct 21 01:19:00 2011
@@ -38,8 +38,8 @@ namespace qmf {
//
// Methods from API handle
//
- bool operator==(const DataAddrImpl&) const;
- bool operator<(const DataAddrImpl&) const;
+ bool operator==(const DataAddrImpl&);
+ bool operator<(const DataAddrImpl&);
DataAddrImpl(const qpid::types::Variant::Map&);
DataAddrImpl(const std::string& _name, const std::string& _agentName, uint32_t _agentEpoch=0) :
agentName(_agentName), name(_name), agentEpoch(_agentEpoch) {}
Modified: qpid/branches/QPID-2519/cpp/src/qmf/PrivateImplRef.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qmf/PrivateImplRef.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qmf/PrivateImplRef.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qmf/PrivateImplRef.h Fri Oct 21 01:19:00 2011
@@ -23,8 +23,8 @@
*/
#include "qmf/ImportExport.h"
-#include "qpid/RefCounted.h"
#include <boost/intrusive_ptr.hpp>
+#include "qpid/RefCounted.h"
namespace qmf {
Propchange: qpid/branches/QPID-2519/cpp/src/qmf/engine/Agent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 01:19:00 2011
@@ -1,2 +0,0 @@
-/qpid/branches/qpid-2935/qpid/cpp/src/qmf/engine/Agent.cpp:1061302-1072333
-/qpid/trunk/qpid/cpp/src/qmf/engine/Agent.cpp:1072051-1185907
Modified: qpid/branches/QPID-2519/cpp/src/qmf/engine/ResilientConnection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qmf/engine/ResilientConnection.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qmf/engine/ResilientConnection.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qmf/engine/ResilientConnection.cpp Fri Oct 21 01:19:00 2011
@@ -334,7 +334,8 @@ void ResilientConnectionImpl::notify()
{
if (notifyFd != -1)
{
- (void) ::write(notifyFd, ".", 1);
+ int unused_ret; //Suppress warnings about ignoring return value.
+ unused_ret = ::write(notifyFd, ".", 1);
}
}
@@ -431,7 +432,8 @@ void ResilientConnectionImpl::EnqueueEve
if (notifyFd != -1)
{
- (void) ::write(notifyFd, ".", 1);
+ int unused_ret; //Suppress warnings about ignoring return value.
+ unused_ret = ::write(notifyFd, ".", 1);
}
}
Modified: qpid/branches/QPID-2519/cpp/src/qmf/engine/SchemaImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qmf/engine/SchemaImpl.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qmf/engine/SchemaImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qmf/engine/SchemaImpl.cpp Fri Oct 21 01:19:00 2011
@@ -35,17 +35,17 @@ using qpid::framing::Uuid;
SchemaHash::SchemaHash()
{
for (int idx = 0; idx < 16; idx++)
- hash.b[idx] = 0x5A;
+ hash[idx] = 0x5A;
}
void SchemaHash::encode(Buffer& buffer) const
{
- buffer.putBin128(hash.b);
+ buffer.putBin128(hash);
}
void SchemaHash::decode(Buffer& buffer)
{
- buffer.getBin128(hash.b);
+ buffer.getBin128(hash);
}
void SchemaHash::update(uint8_t data)
@@ -55,8 +55,9 @@ void SchemaHash::update(uint8_t data)
void SchemaHash::update(const char* data, uint32_t len)
{
- uint64_t* first = &hash.q[0];
- uint64_t* second = &hash.q[1];
+ uint64_t* first = (uint64_t*) hash;
+ uint64_t* second = (uint64_t*) hash + 1;
+
for (uint32_t idx = 0; idx < len; idx++) {
*first = *first ^ (uint64_t) data[idx];
*second = *second << 1;
Modified: qpid/branches/QPID-2519/cpp/src/qmf/engine/SchemaImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qmf/engine/SchemaImpl.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qmf/engine/SchemaImpl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qmf/engine/SchemaImpl.h Fri Oct 21 01:19:00 2011
@@ -35,10 +35,7 @@ namespace engine {
// they've been registered.
class SchemaHash {
- union h {
- uint8_t b[16];
- uint64_t q[2];
- } hash;
+ uint8_t hash[16];
public:
SchemaHash();
void encode(qpid::framing::Buffer& buffer) const;
@@ -50,7 +47,7 @@ namespace engine {
void update(Direction d) { update((uint8_t) d); }
void update(Access a) { update((uint8_t) a); }
void update(bool b) { update((uint8_t) (b ? 1 : 0)); }
- const uint8_t* get() const { return hash.b; }
+ const uint8_t* get() const { return hash; }
bool operator==(const SchemaHash& other) const;
bool operator<(const SchemaHash& other) const;
bool operator>(const SchemaHash& other) const;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/Address.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/Address.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/Address.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/Address.cpp Fri Oct 21 01:19:00 2011
@@ -28,13 +28,7 @@ namespace qpid {
const string Address::TCP("tcp");
ostream& operator<<(ostream& os, const Address& a) {
- // If the host is an IPv6 literal we need to print "[]" around it
- // (we detect IPv6 literals because they contain ":" which is otherwise illegal)
- if (a.host.find(':') != string::npos) {
- return os << a.protocol << ":[" << a.host << "]:" << a.port;
- } else {
- return os << a.protocol << ":" << a.host << ":" << a.port;
- }
+ return os << a.protocol << ":" << a.host << ":" << a.port;
}
bool operator==(const Address& x, const Address& y) {
Modified: qpid/branches/QPID-2519/cpp/src/qpid/Exception.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/Exception.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/Exception.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/Exception.cpp Fri Oct 21 01:19:00 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,25 +21,13 @@
#include "qpid/log/Statement.h"
#include "qpid/Exception.h"
-#include "qpid/DisableExceptionLogging.h"
#include <typeinfo>
#include <assert.h>
#include <string.h>
namespace qpid {
-// Note on static initialization order: if an exception is constructed
-// in a static constructor before disableExceptionLogging has been
-// initialized, the worst that can happen is we lose an exception log
-// message. Since we shouldn't be throwing a lot of exceptions during
-// static construction this seems safe.
-static bool disableExceptionLogging = false;
-
-DisableExceptionLogging::DisableExceptionLogging() { disableExceptionLogging = true; }
-DisableExceptionLogging::~DisableExceptionLogging() { disableExceptionLogging = false; }
-
Exception::Exception(const std::string& msg) throw() : message(msg) {
- if (disableExceptionLogging) return;
QPID_LOG_IF(debug, !msg.empty(), "Exception constructed: " << message);
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/Modules.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/Modules.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/Modules.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/Modules.cpp Fri Oct 21 01:19:00 2011
@@ -64,6 +64,7 @@ void tryShlib(const char* libname_, bool
if (!isShlibName(libname)) libname += suffix();
try {
sys::Shlib shlib(libname);
+ QPID_LOG (info, "Loaded Module: " << libname);
}
catch (const std::exception& /*e*/) {
if (!noThrow)
@@ -81,7 +82,7 @@ void loadModuleDir (std::string dirname,
return;
throw Exception ("Directory not found: " + dirname);
}
- if (!fs::is_directory(dirPath))
+ if (!fs::is_directory(dirPath))
{
throw Exception ("Invalid value for module-dir: " + dirname + " is not a directory");
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/Options.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/Options.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/Options.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/Options.cpp Fri Oct 21 01:19:00 2011
@@ -30,6 +30,23 @@ namespace qpid {
using namespace std;
+/*
+ * ---------------------------------------------
+ * Explanation for Boost 103200 conditional code
+ * ---------------------------------------------
+ *
+ * Please see large comment in Options.h .
+ *
+ */
+
+#if ( BOOST_VERSION == 103200 )
+std::vector<std::string> Options::long_names;
+std::vector<std::string> Options::short_names;
+#endif
+
+
+
+
namespace {
struct EnvOptMapper {
@@ -52,11 +69,49 @@ struct EnvOptMapper {
static const std::string prefix("QPID_");
if (envVar.substr(0, prefix.size()) == prefix) {
string env = envVar.substr(prefix.size());
+#if (BOOST_VERSION >= 103300)
typedef const std::vector< boost::shared_ptr<po::option_description> > OptDescs;
OptDescs::const_iterator i =
find_if(opts.options().begin(), opts.options().end(), boost::bind(matchStr, env, _1));
if (i != opts.options().end())
return (*i)->long_name();
+#else
+ /*
+ * For Boost version 103200 and below.
+ *
+ * In Boost version 103200, the options_description::options member,
+ * used above, is private. So what I will do here is use the
+ * count() funtion, which returns a 1 or 0 indicating presence or
+ * absence of the environment variable.
+ *
+ * If it is present, I will return its name. Env vars do not have
+ * short and long forms, so the name is synonymous with the long
+ * name. (This would not work for command line args.)
+ * And if it's absent -- an empty string.
+ */
+
+
+ /*
+ * The env vars come in unaltered, i.e. QPID_FOO, but the
+ * options are stored normalized as "qpid-foo". Change the
+ * local variable "env" so it can be found by "opts".
+ */
+ for (std::string::iterator i = env.begin(); i != env.end(); ++i)
+ {
+ *i = (*i == '_')
+ ? '-'
+ : ::tolower(*i);
+ }
+
+ if ( opts.count(env.c_str()) > 0 )
+ {
+ return env.c_str();
+ }
+ else
+ {
+ return string();
+ }
+#endif
}
return string();
}
@@ -111,6 +166,10 @@ std::string prettyArg(const std::string&
Options::Options(const string& name) :
po::options_description(name)
+
+#if ( BOOST_VERSION == 103200 )
+ , m_less_easy(this, this)
+#endif
{
}
@@ -127,6 +186,7 @@ void Options::parse(int argc, char const
parsing="command line options";
if (argc > 0 && argv != 0) {
if (allowUnknown) {
+#if (BOOST_VERSION >= 103300)
// This hideous workaround is required because boost 1.33 has a bug
// that causes 'allow_unregistered' to not work.
po::command_line_parser clp = po::command_line_parser(argc, const_cast<char**>(argv)).
@@ -140,6 +200,113 @@ void Options::parse(int argc, char const
filtopts.options.push_back (*i);
po::store(filtopts, vm);
+#elif ( BOOST_VERSION == 103200 )
+
+ /*
+ * "Tokenize" the argv to get rid of equals signs.
+ */
+ vector<string> tokenized_argv;
+ vector<string>::iterator iter;
+
+ for ( int i = 0; i < argc; ++ i )
+ {
+ string s = argv[i];
+ size_t equals_pos = s.find_first_of ( '=' );
+
+ if ( string::npos == equals_pos ) // There's no equals sign. This is a token.
+ {
+ tokenized_argv.push_back(s);
+ }
+ else
+ {
+ // Two tokens -- before and after the equals position.
+ tokenized_argv.push_back ( s.substr(0, equals_pos) );
+ tokenized_argv.push_back ( s.substr(equals_pos+1) );
+ }
+ }
+
+
+ /*
+ * Now "filter" the tokenized argv, to get rid of all
+ * unrecognized options. Because Boost 103200 has no
+ * facility for dealing gracefully with "unregistered"
+ * options.
+ */
+ vector<string> filtered_argv;
+ vector<string>::iterator the_end = tokenized_argv.end();
+
+ // The program-name gets in for free...
+ iter = tokenized_argv.begin();
+ filtered_argv.push_back ( * iter );
+ ++ iter;
+
+ // ...but all other args get checked.
+ while ( iter < the_end )
+ {
+ /*
+ * If this is an argument that is registered,
+ * copy it to filtered_argv and also copy all
+ * of its arguments.
+ */
+ if ( is_registered_option ( * iter ) )
+ {
+ // Store this recognized arg.
+ filtered_argv.push_back ( * iter );
+ ++ iter;
+
+ // Copy all values for the above arg.
+ // Args are tokens that do not start with a minus.
+ while ( (iter < the_end) && ((* iter)[0] != '-') )
+ {
+ filtered_argv.push_back ( * iter );
+ ++ iter;
+ }
+ }
+ else
+ {
+ // Skip this unrecognized arg.
+ ++ iter;
+
+ // Copy all values for the above arg.
+ // Values are tokens that do not start with a minus.
+ while ( (iter < the_end) && ( '-' != (*iter)[0] ) )
+ {
+ ++ iter;
+ }
+ }
+ }
+
+ // Make an array of temporary C strings, because
+ // the interface I can use wants it that way.
+ int new_argc = filtered_argv.size();
+ char ** new_argv = new char * [ new_argc ];
+ int i = 0;
+
+ // cout << "NEW ARGV: |";
+ for ( iter = filtered_argv.begin();
+ iter < filtered_argv.end();
+ ++ iter, ++ i
+ )
+ {
+ new_argv[i] = strdup( (* iter).c_str() );
+ // cout << " " << new_argv[i] ;
+ }
+ // cout << "|\n";
+
+
+ // Use the array of C strings.
+ po::basic_parsed_options<char> bpo = po::parse_command_line(new_argc, const_cast<char**>(new_argv), *this);
+ po::store(bpo, vm);
+
+
+ // Now free the temporary C strings.
+ for ( i = 0; i < new_argc; ++ i )
+ {
+ free ( new_argv[i] );
+ }
+ delete[] new_argv;
+
+#endif
}
else
po::store(po::parse_command_line(argc, const_cast<char**>(argv), *this), vm);
@@ -196,5 +363,107 @@ CommonOptions::CommonOptions(const strin
}
+
+
+#if ( BOOST_VERSION == 103200 )
+options_description_less_easy_init&
+options_description_less_easy_init::operator()(char const * name,
+ char const * description)
+{
+ // Snoop on the arguments....
+ owner->register_names ( name );
+ // ... then call parent function explicitly.
+ po::options_description_easy_init::operator() ( name, description );
+ return * this;
+}
+
+
+options_description_less_easy_init&
+options_description_less_easy_init::operator()(char const * name,
+ const po::value_semantic* s)
+{
+ // Snoop on the arguments....
+ owner->register_names ( name );
+ // ... then call parent function explicitly.
+ po::options_description_easy_init::operator() ( name, s );
+ return * this;
+}
+
+
+options_description_less_easy_init&
+options_description_less_easy_init::operator()(const char* name,
+ const po::value_semantic* s,
+ const char* description)
+{
+ // Snoop on the arguments....
+ owner->register_names ( name );
+ // ... then call parent function explicitly.
+ po::options_description_easy_init::operator() ( name, s, description );
+ return * this;
+}
+
+
+
+
+
+void
+Options::register_names ( std::string s )
+{
+
+ std::string::size_type comma_pos = s.find_first_of ( ',' );
+
+ if ( std::string::npos == comma_pos )
+ {
+ // There is no short-name.
+ long_names.push_back ( s );
+ }
+ else
+ {
+ std::string long_name = s.substr(0, comma_pos),
+ short_name = s.substr(comma_pos+1);
+ long_names .push_back ( long_name );
+ short_names.push_back ( short_name );
+ }
+
+ /*
+ * There is no way to tell when the adding of new options is finished,
+ * so I re-sort after each one.
+ */
+ std::sort ( long_names .begin(), long_names .end() );
+ std::sort ( short_names.begin(), short_names.end() );
+}
+
+
+
+
+
+bool
+Options::is_registered_option ( std::string s )
+{
+ std::string without_dashes = s.substr ( s.find_first_not_of ( '-' ) );
+ std::vector<std::string>::iterator i;
+
+ // Look among the long names.
+ i = std::find ( long_names.begin(),
+ long_names.end(),
+ without_dashes
+ );
+ if ( i != long_names.end() )
+ return true;
+
+ // Look among the short names.
+ i = std::find ( short_names.begin(),
+ short_names.end(),
+ without_dashes
+ );
+ if ( i != short_names.end() )
+ return true;
+
+
+ return false;
+}
+#endif
+
+
} // namespace qpid
Modified: qpid/branches/QPID-2519/cpp/src/qpid/RefCounted.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/RefCounted.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/RefCounted.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/RefCounted.h Fri Oct 21 01:19:00 2011
@@ -53,10 +53,8 @@ protected:
// intrusive_ptr support.
namespace boost {
-template <typename T>
-inline void intrusive_ptr_add_ref(const T* p) { p->qpid::RefCounted::addRef(); }
-template <typename T>
-inline void intrusive_ptr_release(const T* p) { p->qpid::RefCounted::release(); }
+inline void intrusive_ptr_add_ref(const qpid::RefCounted* p) { p->addRef(); }
+inline void intrusive_ptr_release(const qpid::RefCounted* p) { p->release(); }
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/RefCountedBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/RefCountedBuffer.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/RefCountedBuffer.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/RefCountedBuffer.cpp Fri Oct 21 01:19:00 2011
@@ -20,27 +20,34 @@
*/
#include "qpid/RefCountedBuffer.h"
-#include <stdlib.h>
#include <new>
namespace qpid {
-void RefCountedBuffer::released() const {
+RefCountedBuffer::RefCountedBuffer() : count(0) {}
+
+void RefCountedBuffer::destroy() const {
this->~RefCountedBuffer();
- ::free (reinterpret_cast<void *>(const_cast<RefCountedBuffer *>(this)));
+ ::delete[] reinterpret_cast<const char*>(this);
+}
+
+char* RefCountedBuffer::addr() const {
+ return const_cast<char*>(reinterpret_cast<const char*>(this)+sizeof(RefCountedBuffer));
}
-BufferRef RefCountedBuffer::create(size_t n) {
- void* store=::malloc (n + sizeof(RefCountedBuffer));
- if (NULL == store)
- throw std::bad_alloc();
+RefCountedBuffer::pointer RefCountedBuffer::create(size_t n) {
+ char* store=::new char[n+sizeof(RefCountedBuffer)];
new(store) RefCountedBuffer;
- char* start = reinterpret_cast<char *>(store) + sizeof(RefCountedBuffer);
- return BufferRef(
- boost::intrusive_ptr<RefCounted>(reinterpret_cast<RefCountedBuffer*>(store)),
- start, start+n);
+ return pointer(reinterpret_cast<RefCountedBuffer*>(store));
}
+RefCountedBuffer::pointer::pointer() {}
+RefCountedBuffer::pointer::pointer(RefCountedBuffer* x) : p(x) {}
+RefCountedBuffer::pointer::pointer(const pointer& x) : p(x.p) {}
+RefCountedBuffer::pointer::~pointer() {}
+RefCountedBuffer::pointer& RefCountedBuffer::pointer::operator=(const RefCountedBuffer::pointer& x) { p = x.p; return *this; }
+
+char* RefCountedBuffer::pointer::cp() const { return p ? p->get() : 0; }
} // namespace qpid
Modified: qpid/branches/QPID-2519/cpp/src/qpid/RefCountedBuffer.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/RefCountedBuffer.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/RefCountedBuffer.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/RefCountedBuffer.h Fri Oct 21 01:19:00 2011
@@ -22,23 +22,68 @@
*
*/
-#include <qpid/RefCounted.h>
-#include <qpid/BufferRef.h>
+#include <boost/utility.hpp>
+#include <boost/detail/atomic_count.hpp>
+#include <boost/intrusive_ptr.hpp>
namespace qpid {
/**
- * Reference-counted byte buffer. No alignment guarantees.
+ * Reference-counted byte buffer.
+ * No alignment guarantees.
*/
-class RefCountedBuffer : public RefCounted {
- public:
+class RefCountedBuffer : boost::noncopyable {
+ mutable boost::detail::atomic_count count;
+ RefCountedBuffer();
+ void destroy() const;
+ char* addr() const;
+
+public:
+ /** Smart char pointer to a reference counted buffer */
+ class pointer {
+ boost::intrusive_ptr<RefCountedBuffer> p;
+ char* cp() const;
+ pointer(RefCountedBuffer* x);
+ friend class RefCountedBuffer;
+
+ public:
+ pointer();
+ pointer(const pointer&);
+ ~pointer();
+ pointer& operator=(const pointer&);
+
+ char* get() { return cp(); }
+ operator char*() { return cp(); }
+ char& operator*() { return *cp(); }
+ char& operator[](size_t i) { return cp()[i]; }
+
+ const char* get() const { return cp(); }
+ operator const char*() const { return cp(); }
+ const char& operator*() const { return *cp(); }
+ const char& operator[](size_t i) const { return cp()[i]; }
+ };
+
/** Create a reference counted buffer of size n */
- static BufferRef create(size_t n);
+ static pointer create(size_t n);
- protected:
- void released() const;
+ /** Get a pointer to the start of the buffer. */
+ char* get() { return addr(); }
+ const char* get() const { return addr(); }
+ char& operator[](size_t i) { return get()[i]; }
+ const char& operator[](size_t i) const { return get()[i]; }
+
+ void addRef() const { ++count; }
+ void release() const { if (--count==0) destroy(); }
+ long refCount() { return count; }
};
} // namespace qpid
+// intrusive_ptr support.
+namespace boost {
+inline void intrusive_ptr_add_ref(const qpid::RefCountedBuffer* p) { p->addRef(); }
+inline void intrusive_ptr_release(const qpid::RefCountedBuffer* p) { p->release(); }
+}
+
+
#endif /*!QPID_REFCOUNTEDBUFFER_H*/
Modified: qpid/branches/QPID-2519/cpp/src/qpid/Sasl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/Sasl.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/Sasl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/Sasl.h Fri Oct 21 01:19:00 2011
@@ -47,8 +47,8 @@ class Sasl
* client supports.
* @param externalSecuritySettings security related details from the underlying transport
*/
- virtual bool start(const std::string& mechanisms, std::string& response,
- const qpid::sys::SecuritySettings* externalSecuritySettings = 0) = 0;
+ virtual std::string start(const std::string& mechanisms,
+ const qpid::sys::SecuritySettings* externalSecuritySettings = 0) = 0;
virtual std::string step(const std::string& challenge) = 0;
virtual std::string getMechanism() = 0;
virtual std::string getUserId() = 0;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/SaslFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/SaslFactory.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/SaslFactory.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/SaslFactory.cpp Fri Oct 21 01:19:00 2011
@@ -112,7 +112,7 @@ class CyrusSasl : public Sasl
public:
CyrusSasl(const std::string & username, const std::string & password, const std::string & serviceName, const std::string & hostName, int minSsf, int maxSsf, bool allowInteraction);
~CyrusSasl();
- bool start(const std::string& mechanisms, std::string& response, const SecuritySettings* externalSettings);
+ std::string start(const std::string& mechanisms, const SecuritySettings* externalSettings);
std::string step(const std::string& challenge);
std::string getMechanism();
std::string getUserId();
@@ -182,17 +182,16 @@ CyrusSasl::CyrusSasl(const std::string &
callbacks[i].id = SASL_CB_AUTHNAME;
callbacks[i].proc = (CallbackProc*) &getUserFromSettings;
callbacks[i++].context = &settings;
-
- callbacks[i].id = SASL_CB_PASS;
- if (settings.password.empty()) {
- callbacks[i].proc = 0;
- callbacks[i++].context = 0;
- } else {
- callbacks[i].proc = (CallbackProc*) &getPasswordFromSettings;
- callbacks[i++].context = &settings;
- }
}
+ callbacks[i].id = SASL_CB_PASS;
+ if (settings.password.empty()) {
+ callbacks[i].proc = 0;
+ callbacks[i++].context = 0;
+ } else {
+ callbacks[i].proc = (CallbackProc*) &getPasswordFromSettings;
+ callbacks[i++].context = &settings;
+ }
callbacks[i].id = SASL_CB_LIST_END;
callbacks[i].proc = 0;
@@ -210,7 +209,7 @@ namespace {
const std::string SSL("ssl");
}
-bool CyrusSasl::start(const std::string& mechanisms, std::string& response, const SecuritySettings* externalSettings)
+std::string CyrusSasl::start(const std::string& mechanisms, const SecuritySettings* externalSettings)
{
QPID_LOG(debug, "CyrusSasl::start(" << mechanisms << ")");
int result = sasl_client_new(settings.service.c_str(),
@@ -283,12 +282,7 @@ bool CyrusSasl::start(const std::string&
mechanism = std::string(chosenMechanism);
QPID_LOG(debug, "CyrusSasl::start(" << mechanisms << "): selected "
<< mechanism << " response: '" << std::string(out, outlen) << "'");
- if (out) {
- response = std::string(out, outlen);
- return true;
- } else {
- return false;
- }
+ return std::string(out, outlen);
}
std::string CyrusSasl::step(const std::string& challenge)
Modified: qpid/branches/QPID-2519/cpp/src/qpid/Url.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/Url.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/Url.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/Url.cpp Fri Oct 21 01:19:00 2011
@@ -156,12 +156,11 @@ class UrlParser {
return false;
}
- // A liberal interpretation of http://www.ietf.org/rfc/rfc3986.txt.
- // Works for DNS names and and ipv4 and ipv6 literals
+ // TODO aconway 2008-11-20: this does not fully implement
+ // http://www.ietf.org/rfc/rfc3986.txt. Works for DNS names and
+ // ipv4 literals but won't handle ipv6.
//
bool host(string& h) {
- if (ip6literal(h)) return true;
-
const char* start=i;
while (unreserved() || pctEncoded())
;
@@ -170,22 +169,6 @@ class UrlParser {
return true;
}
- // This is a bit too liberal for IPv6 literal addresses, but probably good enough
- bool ip6literal(string& h) {
- if (literal("[")) {
- const char* start = i;
- while (hexDigit() || literal(":") || literal("."))
- ;
- const char* end = i;
- if ( end-start < 2 ) return false; // Smallest valid address is "::"
- if (literal("]")) {
- h.assign(start, end);
- return true;
- }
- }
- return false;
- }
-
bool unreserved() { return (::isalnum(*i) || ::strchr("-._~", *i)) && advance(); }
bool pctEncoded() { return literal("%") && hexDigit() && hexDigit(); }
Modified: qpid/branches/QPID-2519/cpp/src/qpid/acl/Acl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/acl/Acl.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/acl/Acl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/acl/Acl.cpp Fri Oct 21 01:19:00 2011
@@ -180,10 +180,7 @@ Acl::Acl (AclValues& av, Broker& b): acl
{
case _qmf::Acl::METHOD_RELOADACLFILE :
readAclFile(text);
- if (text.empty())
- status = Manageable::STATUS_OK;
- else
- status = Manageable::STATUS_USER;
+ status = Manageable::STATUS_USER;
break;
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/acl/AclPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/acl/AclPlugin.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/acl/AclPlugin.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/acl/AclPlugin.cpp Fri Oct 21 01:19:00 2011
@@ -69,7 +69,7 @@ struct AclPlugin : public Plugin {
}
acl = new Acl(values, b);
- b.setAcl(acl.get());
+ b.setAcl(acl.get());
b.addFinalizer(boost::bind(&AclPlugin::shutdown, this));
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/agent/ManagementAgentImpl.cpp Fri Oct 21 01:19:00 2011
@@ -305,47 +305,43 @@ void ManagementAgentImpl::raiseEvent(con
"emerg", "alert", "crit", "error", "warn",
"note", "info", "debug"
};
- string content;
+ sys::Mutex::ScopedLock lock(agentLock);
+ Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
+ uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
stringstream key;
- Variant::Map headers;
- {
- sys::Mutex::ScopedLock lock(agentLock);
- Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
- uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
+ // key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
+ // event.getPackageName() << "." << event.getEventName();
+ key << "agent.ind.event." << keyifyNameStr(event.getPackageName())
+ << "." << keyifyNameStr(event.getEventName())
+ << "." << severityStr[sev]
+ << "." << vendorNameKey
+ << "." << productNameKey
+ << "." << instanceNameKey;
- // key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
- // event.getPackageName() << "." << event.getEventName();
- key << "agent.ind.event." << keyifyNameStr(event.getPackageName())
- << "." << keyifyNameStr(event.getEventName())
- << "." << severityStr[sev]
- << "." << vendorNameKey
- << "." << productNameKey
- << "." << instanceNameKey;
-
- Variant::Map map_;
- Variant::Map schemaId;
- Variant::Map values;
-
- map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
- event.getEventName(),
- event.getMd5Sum(),
- ManagementItem::CLASS_KIND_EVENT);
- event.mapEncode(values);
- map_["_values"] = values;
- map_["_timestamp"] = uint64_t(Duration(EPOCH, now()));
- map_["_severity"] = sev;
+ Variant::Map map_;
+ Variant::Map schemaId;
+ Variant::Map values;
+ Variant::Map headers;
+ string content;
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_event";
- headers["qmf.agent"] = name_address;
+ map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
+ event.getEventName(),
+ event.getMd5Sum(),
+ ManagementItem::CLASS_KIND_EVENT);
+ event.mapEncode(values);
+ map_["_values"] = values;
+ map_["_timestamp"] = uint64_t(Duration(EPOCH, now()));
+ map_["_severity"] = sev;
- Variant::List list;
- list.push_back(map_);
- ListCodec::encode(list, content);
- }
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_event";
+ headers["qmf.agent"] = name_address;
+ Variant::List list;
+ list.push_back(map_);
+ ListCodec::encode(list, content);
connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str(), "amqp/list");
}
@@ -525,12 +521,9 @@ void ManagementAgentImpl::sendException(
void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& rte, const string& rtk)
{
+ sys::Mutex::ScopedLock lock(agentLock);
string packageName;
SchemaClassKey key;
- uint32_t outLen(0);
- char localBuffer[MA_BUFFER_SIZE];
- Buffer outBuffer(localBuffer, MA_BUFFER_SIZE);
- bool found(false);
inBuffer.getShortString(packageName);
inBuffer.getShortString(key.name);
@@ -538,29 +531,25 @@ void ManagementAgentImpl::handleSchemaRe
QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name);
- {
- sys::Mutex::ScopedLock lock(agentLock);
- PackageMap::iterator pIter = packages.find(packageName);
- if (pIter != packages.end()) {
- ClassMap& cMap = pIter->second;
- ClassMap::iterator cIter = cMap.find(key);
- if (cIter != cMap.end()) {
- SchemaClass& schema = cIter->second;
- string body;
-
- encodeHeader(outBuffer, 's', sequence);
- schema.writeSchemaCall(body);
- outBuffer.putRawData(body);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- found = true;
- }
- }
- }
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end()) {
+ ClassMap& cMap = pIter->second;
+ ClassMap::iterator cIter = cMap.find(key);
+ if (cIter != cMap.end()) {
+ SchemaClass& schema = cIter->second;
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+ string body;
+
+ encodeHeader(outBuffer, 's', sequence);
+ schema.writeSchemaCall(body);
+ outBuffer.putRawData(body);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk);
- if (found) {
- connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk);
- QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
+ QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
+ }
}
}
@@ -980,6 +969,18 @@ ManagementAgentImpl::PackageMap::iterato
pair<PackageMap::iterator, bool> result =
packages.insert(pair<string, ClassMap>(name, ClassMap()));
+ if (connected) {
+ // Publish a package-indication message
+ Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
+ uint32_t outLen;
+
+ encodeHeader(outBuffer, 'p');
+ encodePackageIndication(outBuffer, result.first);
+ outLen = MA_BUFFER_SIZE - outBuffer.available();
+ outBuffer.reset();
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", "schema.package");
+ }
+
return result.first;
}
@@ -1037,146 +1038,131 @@ void ManagementAgentImpl::encodeClassInd
QPID_LOG(trace, "SENT ClassInd: package=" << (*pIter).first << " class=" << key.name);
}
-struct MessageItem {
- string content;
- Variant::Map headers;
- string key;
- MessageItem(const Variant::Map& h, const string& k) : headers(h), key(k) {}
-};
-
void ManagementAgentImpl::periodicProcessing()
{
string addr_key_base = "agent.ind.data.";
+ sys::Mutex::ScopedLock lock(agentLock);
list<ObjectId> deleteList;
- list<boost::shared_ptr<MessageItem> > message_list;
+
+ if (!connected)
+ return;
sendHeartbeat();
- {
- sys::Mutex::ScopedLock lock(agentLock);
+ moveNewObjectsLH();
- if (!connected)
- return;
+ //
+ // Clear the been-here flag on all objects in the map.
+ //
+ for (ObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second.get();
+ object->setFlags(0);
+ if (publishAllData) {
+ object->setForcePublish(true);
+ }
+ }
+
+ publishAllData = false;
- moveNewObjectsLH();
+ //
+ // Process the entire object map.
+ //
+ uint32_t v2Objs = 0;
+
+ for (ObjectMap::iterator baseIter = managementObjects.begin();
+ baseIter != managementObjects.end();
+ baseIter++) {
+ ManagementObject* baseObject = baseIter->second.get();
//
- // Clear the been-here flag on all objects in the map.
+ // Skip until we find a base object requiring a sent message.
//
- for (ObjectMap::iterator iter = managementObjects.begin();
+ if (baseObject->getFlags() == 1 ||
+ (!baseObject->getConfigChanged() &&
+ !baseObject->getInstChanged() &&
+ !baseObject->getForcePublish() &&
+ !baseObject->isDeleted()))
+ continue;
+
+ std::string packageName = baseObject->getPackageName();
+ std::string className = baseObject->getClassName();
+
+ Variant::List list_;
+ string content;
+ std::stringstream addr_key;
+ Variant::Map headers;
+
+ addr_key << addr_key_base;
+ addr_key << keyifyNameStr(packageName)
+ << "." << keyifyNameStr(className)
+ << "." << vendorNameKey
+ << "." << productNameKey
+ << "." << instanceNameKey;
+
+ headers["method"] = "indication";
+ headers["qmf.opcode"] = "_data_indication";
+ headers["qmf.content"] = "_data";
+ headers["qmf.agent"] = name_address;
+
+ for (ObjectMap::iterator iter = baseIter;
iter != managementObjects.end();
iter++) {
ManagementObject* object = iter->second.get();
- object->setFlags(0);
- if (publishAllData) {
- object->setForcePublish(true);
- }
- }
-
- publishAllData = false;
-
- //
- // Process the entire object map.
- //
- uint32_t v2Objs = 0;
-
- for (ObjectMap::iterator baseIter = managementObjects.begin();
- baseIter != managementObjects.end();
- baseIter++) {
- ManagementObject* baseObject = baseIter->second.get();
-
- //
- // Skip until we find a base object requiring a sent message.
- //
- if (baseObject->getFlags() == 1 ||
- (!baseObject->getConfigChanged() &&
- !baseObject->getInstChanged() &&
- !baseObject->getForcePublish() &&
- !baseObject->isDeleted()))
- continue;
-
- std::string packageName = baseObject->getPackageName();
- std::string className = baseObject->getClassName();
-
- Variant::List list_;
- std::stringstream addr_key;
- Variant::Map headers;
-
- addr_key << addr_key_base;
- addr_key << keyifyNameStr(packageName)
- << "." << keyifyNameStr(className)
- << "." << vendorNameKey
- << "." << productNameKey
- << "." << instanceNameKey;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = name_address;
-
- for (ObjectMap::iterator iter = baseIter;
- iter != managementObjects.end();
- iter++) {
- ManagementObject* object = iter->second.get();
- bool send_stats, send_props;
- if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
- object->setFlags(1);
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
-
- send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
- send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
-
- if (send_stats || send_props) {
- Variant::Map map_;
- Variant::Map values;
- Variant::Map oid;
+ bool send_stats, send_props;
+ if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
+ object->setFlags(1);
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
- object->getObjectId().mapEncode(oid);
- map_["_object_id"] = oid;
- map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
- object->getClassName(),
- object->getMd5Sum());
- object->writeTimestamps(map_);
- object->mapEncodeValues(values, send_props, send_stats);
- map_["_values"] = values;
- list_.push_back(map_);
+ send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
+ send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
- if (++v2Objs >= maxV2ReplyObjs) {
- v2Objs = 0;
- boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str()));
- ListCodec::encode(list_, item->content);
- message_list.push_back(item);
- list_.clear();
- }
+ if (send_stats || send_props) {
+ Variant::Map map_;
+ Variant::Map values;
+ Variant::Map oid;
+
+ object->getObjectId().mapEncode(oid);
+ map_["_object_id"] = oid;
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ object->writeTimestamps(map_);
+ object->mapEncodeValues(values, send_props, send_stats);
+ map_["_values"] = values;
+ list_.push_back(map_);
+
+ if (++v2Objs >= maxV2ReplyObjs) {
+ v2Objs = 0;
+ ListCodec::encode(list_, content);
+
+ connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list");
+ list_.clear();
+ content.clear();
+ QPID_LOG(trace, "SENT DataIndication");
}
-
- if (object->isDeleted())
- deleteList.push_back(iter->first);
- object->setForcePublish(false);
}
- }
- if (!list_.empty()) {
- boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str()));
- ListCodec::encode(list_, item->content);
- message_list.push_back(item);
+ if (object->isDeleted())
+ deleteList.push_back(iter->first);
+ object->setForcePublish(false);
}
}
- // Delete flagged objects
- for (list<ObjectId>::reverse_iterator iter = deleteList.rbegin();
- iter != deleteList.rend();
- iter++)
- managementObjects.erase(*iter);
+ if (!list_.empty()) {
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(), "amqp/list");
+ QPID_LOG(trace, "SENT DataIndication");
+ }
}
- while (!message_list.empty()) {
- boost::shared_ptr<MessageItem> item(message_list.front());
- message_list.pop_front();
- connThreadBody.sendBuffer(item->content, "", item->headers, topicExchange, item->key, "amqp/list");
- QPID_LOG(trace, "SENT DataIndication");
- }
+ // Delete flagged objects
+ for (list<ObjectId>::reverse_iterator iter = deleteList.rbegin();
+ iter != deleteList.rend();
+ iter++)
+ managementObjects.erase(*iter);
}
@@ -1378,26 +1364,13 @@ bool ManagementAgentImpl::ConnectionThre
void ManagementAgentImpl::PublishThread::run()
{
- uint16_t totalSleep;
- uint16_t sleepTime;
+ uint16_t totalSleep;
while (!shutdown) {
agent.periodicProcessing();
totalSleep = 0;
-
- //
- // Calculate a sleep time that is no greater than 5 seconds and
- // no less than 1 second.
- //
- sleepTime = agent.getInterval();
- if (sleepTime > 5)
- sleepTime = 5;
- else if (sleepTime == 0)
- sleepTime = 1;
-
- while (totalSleep < agent.getInterval() && !shutdown) {
- ::sleep(sleepTime);
- totalSleep += sleepTime;
+ while (totalSleep++ < agent.getInterval() && !shutdown) {
+ ::sleep(1);
}
}
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/agent/ManagementAgentImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/agent/ManagementAgentImpl.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/agent/ManagementAgentImpl.h Fri Oct 21 01:19:00 2011
@@ -62,8 +62,8 @@ class ManagementAgentImpl : public Manag
uint16_t intervalSeconds = 10,
bool useExternalThread = false,
const std::string& storeFile = "",
- const std::string& uid = "",
- const std::string& pwd = "",
+ const std::string& uid = "guest",
+ const std::string& pwd = "guest",
const std::string& mech = "PLAIN",
const std::string& proto = "tcp");
void init(const management::ConnectionSettings& settings,
Modified: qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/Codecs.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/Codecs.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/Codecs.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/Codecs.cpp Fri Oct 21 01:19:00 2011
@@ -127,10 +127,10 @@ Variant toVariant(boost::shared_ptr<Fiel
switch (in->getType()) {
//Fixed Width types:
case 0x01: out.setEncoding(amqp0_10_binary);
- case 0x02: out = in->getIntegerValue<int8_t>(); break;
- case 0x03: out = in->getIntegerValue<uint8_t>(); break;
+ case 0x02: out = in->getIntegerValue<int8_t, 1>(); break;
+ case 0x03: out = in->getIntegerValue<uint8_t, 1>(); break;
case 0x04: break; //TODO: iso-8859-15 char
- case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t>()); break;
+ case 0x08: out = static_cast<bool>(in->getIntegerValue<uint8_t, 1>()); break;
case 0x10: out.setEncoding(amqp0_10_binary);
case 0x11: out = in->getIntegerValue<int16_t, 2>(); break;
case 0x12: out = in->getIntegerValue<uint16_t, 2>(); break;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Fri Oct 21 01:19:00 2011
@@ -130,6 +130,9 @@ void SessionHandler::handleException(con
}
namespace {
+bool isControl(const AMQFrame& f) {
+ return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_CONTROL;
+}
bool isCommand(const AMQFrame& f) {
return f.getMethod() && f.getMethod()->type() == framing::SEGMENT_TYPE_COMMAND;
}
@@ -188,10 +191,9 @@ void SessionHandler::detach(const std::s
void SessionHandler::detached(const std::string& name, uint8_t code) {
CHECK_NAME(name, "session.detached");
awaitingDetached = false;
- if (code != session::DETACH_CODE_NORMAL) {
- sendReady = receiveReady = false;
+ if (code != session::DETACH_CODE_NORMAL)
channelException(convert(code), "session.detached from peer.");
- } else {
+ else {
handleDetach();
}
}
Modified: qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/SessionHandler.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/SessionHandler.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/amqp_0_10/SessionHandler.h Fri Oct 21 01:19:00 2011
@@ -41,8 +41,8 @@ namespace amqp_0_10 {
* to a session state.
*/
-class QPID_COMMON_CLASS_EXTERN SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
- public framing::FrameHandler::InOutHandler
+class SessionHandler : public framing::AMQP_AllOperations::SessionHandler,
+ public framing::FrameHandler::InOutHandler
{
public:
QPID_COMMON_EXTERN SessionHandler(framing::FrameHandler* out=0, uint16_t channel=0);
@@ -66,7 +66,7 @@ class QPID_COMMON_CLASS_EXTERN SessionHa
QPID_COMMON_EXTERN void handleException(const qpid::SessionException& e);
/** True if the handler is ready to send and receive */
- QPID_COMMON_EXTERN bool ready() const;
+ bool ready() const;
// Protocol methods
QPID_COMMON_EXTERN void attach(const std::string& name, bool force);
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Bridge.cpp?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Bridge.cpp Fri Oct 21 01:19:00 2011
@@ -164,12 +164,6 @@ void Bridge::destroy()
listener(this);
}
-bool Bridge::isSessionReady() const
-{
- SessionHandler& sessionHandler = conn->getChannel(id);
- return sessionHandler.ready();
-}
-
void Bridge::setPersistenceId(uint64_t pId) const
{
persistenceId = pId;
Modified: qpid/branches/QPID-2519/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/qpid/broker/Bridge.h?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/qpid/broker/Bridge.h (original)
+++ qpid/branches/QPID-2519/cpp/src/qpid/broker/Bridge.h Fri Oct 21 01:19:00 2011
@@ -59,8 +59,6 @@ public:
void destroy();
bool isDurable() { return args.i_durable; }
- bool isSessionReady() const;
-
management::ManagementObject* GetManagementObject() const;
management::Manageable::status_t ManagementMethod(uint32_t methodId,
management::Args& args,
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org