You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2011/08/15 18:47:57 UTC
svn commit: r1157907 - in /qpid/trunk/qpid/cpp: include/qmf/AgentSession.h
include/qmf/ConsoleSession.h src/qmf/AgentSession.cpp
src/qmf/ConsoleSession.cpp src/qmf/ConsoleSessionImpl.h
src/qpid/agent/ManagementAgentImpl.cpp
Author: tross
Date: Mon Aug 15 16:47:56 2011
New Revision: 1157907
URL: http://svn.apache.org/viewvc?rev=1157907&view=rev
Log:
QPID-3423 - Timing and Performance Improvements in QMF Libraries
Modified:
qpid/trunk/qpid/cpp/include/qmf/AgentSession.h
qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h
qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp
qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp
qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h
qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
Modified: qpid/trunk/qpid/cpp/include/qmf/AgentSession.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qmf/AgentSession.h?rev=1157907&r1=1157906&r2=1157907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qmf/AgentSession.h (original)
+++ qpid/trunk/qpid/cpp/include/qmf/AgentSession.h Mon Aug 15 16:47:56 2011
@@ -71,6 +71,11 @@ namespace qmf {
* If False: Listen only on the routable direct address
* strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network
* - If False: Operate more flexibly with regard to use of messaging facilities [default]
+ * max-thread-wait-time:N - Time (in seconds) the session thread will wait for messages from the network between
+ * periodic background processing passes. [default: 5]
+ * Must not be greater than 'interval'. Larger numbers will cause fewer wake-ups but will
+ * increase the time it takes to shut down the process. This setting will not affect the
+ * agent's response time for queries or method invocation.
*/
QMF_EXTERN AgentSession(qpid::messaging::Connection& conn, const std::string& options="");
Modified: qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h?rev=1157907&r1=1157906&r2=1157907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h (original)
+++ qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h Mon Aug 15 16:47:56 2011
@@ -61,6 +61,10 @@ namespace qmf {
* If False: Listen only on the routable direct address
* strict-security:{True,False} - If True: Cooperate with the broker to enforce strict access control to the network
* - If False: Operate more flexibly with regard to use of messaging facilities [default]
+ * max-thread-wait-time:N - Time (in seconds) the session thread will wait for messages from the network between
+ * periodic background processing passes.
+ * Must not be greater than 60. Larger numbers will cause fewer wake-ups but will
+ * increase the time it takes to shut down the process. [default: 5]
*/
QMF_EXTERN ConsoleSession(qpid::messaging::Connection& conn, const std::string& options="");
Modified: qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp?rev=1157907&r1=1157906&r2=1157907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp Mon Aug 15 16:47:56 2011
@@ -120,6 +120,7 @@ namespace qmf {
bool publicEvents;
bool listenOnDirect;
bool strictSecurity;
+ uint32_t maxThreadWaitTime;
uint64_t schemaUpdateTime;
string directBase;
string topicBase;
@@ -185,7 +186,7 @@ AgentSessionImpl::AgentSessionImpl(Conne
bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
- listenOnDirect(true), strictSecurity(false),
+ listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
{
//
@@ -246,7 +247,14 @@ AgentSessionImpl::AgentSessionImpl(Conne
iter = optMap.find("strict-security");
if (iter != optMap.end())
strictSecurity = iter->second.asBool();
+
+ iter = optMap.find("max-thread-wait-time");
+ if (iter != optMap.end())
+ maxThreadWaitTime = iter->second.asUint32();
}
+
+ if (maxThreadWaitTime > interval)
+ maxThreadWaitTime = interval;
}
@@ -254,6 +262,11 @@ AgentSessionImpl::~AgentSessionImpl()
{
if (opened)
close();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
}
@@ -262,6 +275,12 @@ void AgentSessionImpl::open()
if (opened)
throw QmfException("The session is already open");
+ // If the thread exists, join and delete it before creating a new one.
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
+
const string addrArgs(";{create:never,node:{type:topic}}");
const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str());
attributes["_direct_subject"] = routableAddr;
@@ -304,13 +323,8 @@ void AgentSessionImpl::close()
if (!opened)
return;
- // Stop and join the receiver thread
+ // Stop the receiver thread. Don't join it until the destructor is called or open() is called.
threadCanceled = true;
- thread->join();
- delete thread;
-
- // Close the AMQP session
- session.close();
opened = false;
}
@@ -320,9 +334,13 @@ bool AgentSessionImpl::nextEvent(AgentEv
uint64_t milliseconds = timeout.getMilliseconds();
qpid::sys::Mutex::ScopedLock l(lock);
- if (eventQueue.empty() && milliseconds > 0)
- cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
- qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
+ if (eventQueue.empty() && milliseconds > 0) {
+ int64_t nsecs(qpid::sys::TIME_INFINITE);
+ if ((uint64_t)(nsecs / 1000000) > milliseconds)
+ nsecs = (int64_t) milliseconds * 1000000;
+ qpid::sys::Duration then(nsecs);
+ cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
+ }
if (!eventQueue.empty()) {
event = eventQueue.front();
@@ -1050,7 +1068,7 @@ void AgentSessionImpl::run()
periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC);
Receiver rx;
- bool valid = session.nextReceiver(rx, Duration::SECOND);
+ bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
if (threadCanceled)
break;
if (valid) {
@@ -1067,6 +1085,7 @@ void AgentSessionImpl::run()
enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED)));
}
+ session.close();
QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName);
}
Modified: qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp?rev=1157907&r1=1157906&r2=1157907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp Mon Aug 15 16:47:56 2011
@@ -66,7 +66,7 @@ Subscription ConsoleSession::subscribe(c
//========================================================================================
ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
- connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false),
+ connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1)
{
@@ -92,7 +92,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(C
iter = optMap.find("strict-security");
if (iter != optMap.end())
strictSecurity = iter->second.asBool();
+
+ iter = optMap.find("max-thread-wait-time");
+ if (iter != optMap.end())
+ maxThreadWaitTime = iter->second.asUint32();
}
+
+ if (maxThreadWaitTime > 60)
+ maxThreadWaitTime = 60;
}
@@ -100,6 +107,11 @@ ConsoleSessionImpl::~ConsoleSessionImpl(
{
if (opened)
close();
+
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
}
@@ -154,6 +166,12 @@ void ConsoleSessionImpl::open()
if (opened)
throw QmfException("The session is already open");
+ // If the thread exists, join and delete it before creating a new one.
+ if (thread) {
+ thread->join();
+ delete thread;
+ }
+
// Establish messaging addresses
directBase = "qmf." + domain + ".direct";
topicBase = "qmf." + domain + ".topic";
@@ -182,14 +200,13 @@ void ConsoleSessionImpl::open()
// Start the receiver thread
threadCanceled = false;
+ opened = true;
thread = new qpid::sys::Thread(*this);
// Send an agent_locate to direct address 'broker' to identify the connected-broker-agent.
sendBrokerLocate();
if (agentQuery)
sendAgentLocate();
-
- opened = true;
}
@@ -198,13 +215,8 @@ void ConsoleSessionImpl::close()
if (!opened)
throw QmfException("The session is already closed");
- // Stop and join the receiver thread
+ // Stop the receiver thread. Don't join it until the destructor is called or open() is called.
threadCanceled = true;
- thread->join();
- delete thread;
-
- // Close the AMQP session
- session.close();
opened = false;
}
@@ -214,9 +226,13 @@ bool ConsoleSessionImpl::nextEvent(Conso
uint64_t milliseconds = timeout.getMilliseconds();
qpid::sys::Mutex::ScopedLock l(lock);
- if (eventQueue.empty() && milliseconds > 0)
- cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
- qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
+ if (eventQueue.empty() && milliseconds > 0) {
+ int64_t nsecs(qpid::sys::TIME_INFINITE);
+ if ((uint64_t)(nsecs / 1000000) > milliseconds)
+ nsecs = (int64_t) milliseconds * 1000000;
+ qpid::sys::Duration then(nsecs);
+ cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
+ }
if (!eventQueue.empty()) {
event = eventQueue.front();
@@ -596,7 +612,7 @@ void ConsoleSessionImpl::run()
qpid::sys::TIME_SEC);
Receiver rx;
- bool valid = session.nextReceiver(rx, Duration::SECOND);
+ bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
if (threadCanceled)
break;
if (valid) {
@@ -613,6 +629,7 @@ void ConsoleSessionImpl::run()
enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED)));
}
+ session.close();
QPID_LOG(debug, "ConsoleSession thread exiting");
}
Modified: qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h?rev=1157907&r1=1157906&r2=1157907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h Mon Aug 15 16:47:56 2011
@@ -76,6 +76,7 @@ namespace qmf {
uint32_t maxAgentAgeMinutes;
bool listenOnDirect;
bool strictSecurity;
+ uint32_t maxThreadWaitTime;
Query agentQuery;
bool opened;
std::queue<ConsoleEvent> eventQueue;
Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=1157907&r1=1157906&r2=1157907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Mon Aug 15 16:47:56 2011
@@ -1378,13 +1378,26 @@ bool ManagementAgentImpl::ConnectionThre
void ManagementAgentImpl::PublishThread::run()
{
- uint16_t totalSleep;
+ uint16_t totalSleep;
+ uint16_t sleepTime;
while (!shutdown) {
agent.periodicProcessing();
totalSleep = 0;
- while (totalSleep++ < agent.getInterval() && !shutdown) {
- ::sleep(1);
+
+ //
+ // Calculate a sleep time that is no greater than 5 seconds and
+ // no less than 1 second.
+ //
+ sleepTime = agent.getInterval();
+ if (sleepTime > 5)
+ sleepTime = 5;
+ else if (sleepTime == 0)
+ sleepTime = 1;
+
+ while (totalSleep < agent.getInterval() && !shutdown) {
+ ::sleep(sleepTime);
+ totalSleep += sleepTime;
}
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org